1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep.impl.node;
9 
10 import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED;
11 import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER;
12 import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA;
13 import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN;
14 import static com.sleepycat.je.rep.impl.RepParams.DBTREE_CACHE_CLEAR_COUNT;
15 import static com.sleepycat.je.rep.impl.RepParams.ENV_CONSISTENCY_TIMEOUT;
16 import static com.sleepycat.je.rep.impl.RepParams.HEARTBEAT_INTERVAL;
17 import static com.sleepycat.je.rep.impl.RepParams.IGNORE_SECONDARY_NODE_ID;
18 import static com.sleepycat.je.rep.impl.RepParams.LOG_FLUSH_TASK_INTERVAL;
19 import static com.sleepycat.je.rep.impl.RepParams.NODE_TYPE;
20 import static com.sleepycat.je.rep.impl.RepParams.REPLAY_COST_PERCENT;
21 import static com.sleepycat.je.rep.impl.RepParams.REPLAY_FREE_DISK_PERCENT;
22 import static com.sleepycat.je.rep.impl.RepParams.RESET_REP_GROUP_RETAIN_UUID;
23 import static com.sleepycat.je.rep.impl.RepParams.RUN_LOG_FLUSH_TASK;
24 
25 import java.io.IOException;
26 import java.net.InetAddress;
27 import java.net.InetSocketAddress;
28 import java.util.BitSet;
29 import java.util.Date;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.NavigableSet;
35 import java.util.Set;
36 import java.util.Timer;
37 import java.util.TreeSet;
38 import java.util.UUID;
39 import java.util.concurrent.TimeUnit;
40 import java.util.logging.Level;
41 import java.util.logging.Logger;
42 
43 import com.sleepycat.je.CheckpointConfig;
44 import com.sleepycat.je.DatabaseException;
45 import com.sleepycat.je.EnvironmentFailureException;
46 import com.sleepycat.je.JEVersion;
47 import com.sleepycat.je.RecoveryProgress;
48 import com.sleepycat.je.ReplicaConsistencyPolicy;
49 import com.sleepycat.je.StatsConfig;
50 import com.sleepycat.je.cleaner.Cleaner;
51 import com.sleepycat.je.cleaner.FileSelector;
52 import com.sleepycat.je.cleaner.FileSummary;
53 import com.sleepycat.je.dbi.DbConfigManager;
54 import com.sleepycat.je.dbi.StartupTracker.Phase;
55 import com.sleepycat.je.log.FileManager;
56 import com.sleepycat.je.log.LogEntryType;
57 import com.sleepycat.je.log.LogManager;
58 import com.sleepycat.je.rep.AppStateMonitor;
59 import com.sleepycat.je.rep.GroupShutdownException;
60 import com.sleepycat.je.rep.InsufficientLogException;
61 import com.sleepycat.je.rep.MasterStateException;
62 import com.sleepycat.je.rep.MasterTransferFailureException;
63 import com.sleepycat.je.rep.MemberActiveException;
64 import com.sleepycat.je.rep.MemberNotFoundException;
65 import com.sleepycat.je.rep.NodeType;
66 import com.sleepycat.je.rep.QuorumPolicy;
67 import com.sleepycat.je.rep.RepInternal;
68 import com.sleepycat.je.rep.ReplicaConsistencyException;
69 import com.sleepycat.je.rep.ReplicaStateException;
70 import com.sleepycat.je.rep.ReplicatedEnvironment;
71 import com.sleepycat.je.rep.ReplicatedEnvironmentStats;
72 import com.sleepycat.je.rep.ReplicationNode;
73 import com.sleepycat.je.rep.RestartRequiredException;
74 import com.sleepycat.je.rep.UnknownMasterException;
75 import com.sleepycat.je.rep.arbitration.Arbiter;
76 import com.sleepycat.je.rep.elections.Elections;
77 import com.sleepycat.je.rep.elections.Proposer.Proposal;
78 import com.sleepycat.je.rep.elections.TimebasedProposalGenerator;
79 import com.sleepycat.je.rep.impl.BinaryNodeStateProtocol;
80 import com.sleepycat.je.rep.impl.BinaryNodeStateProtocol.BinaryNodeStateResponse;
81 import com.sleepycat.je.rep.impl.BinaryNodeStateService;
82 import com.sleepycat.je.rep.impl.GroupService;
83 import com.sleepycat.je.rep.impl.MinJEVersionUnsupportedException;
84 import com.sleepycat.je.rep.impl.NodeStateService;
85 import com.sleepycat.je.rep.impl.PointConsistencyPolicy;
86 import com.sleepycat.je.rep.impl.RepGroupDB;
87 import com.sleepycat.je.rep.impl.RepGroupImpl;
88 import com.sleepycat.je.rep.impl.RepGroupImpl.NodeConflictException;
89 import com.sleepycat.je.rep.impl.RepGroupProtocol;
90 import com.sleepycat.je.rep.impl.RepGroupProtocol.GroupResponse;
91 import com.sleepycat.je.rep.impl.RepImpl;
92 import com.sleepycat.je.rep.impl.RepNodeImpl;
93 import com.sleepycat.je.rep.impl.RepParams;
94 import com.sleepycat.je.rep.impl.TextProtocol.MessageExchange;
95 import com.sleepycat.je.rep.impl.TextProtocol.ResponseMessage;
96 import com.sleepycat.je.rep.monitor.LeaveGroupEvent.LeaveReason;
97 import com.sleepycat.je.rep.net.DataChannel;
98 import com.sleepycat.je.rep.net.DataChannelFactory.ConnectOptions;
99 import com.sleepycat.je.rep.stream.FeederTxns;
100 import com.sleepycat.je.rep.stream.MasterChangeListener;
101 import com.sleepycat.je.rep.stream.MasterStatus;
102 import com.sleepycat.je.rep.stream.MasterSuggestionGenerator;
103 import com.sleepycat.je.rep.util.ldiff.LDiffService;
104 import com.sleepycat.je.rep.utilint.RepUtils;
105 import com.sleepycat.je.rep.utilint.RepUtils.ExceptionAwareCountDownLatch;
106 import com.sleepycat.je.rep.utilint.ServiceDispatcher;
107 import com.sleepycat.je.rep.vlsn.VLSNIndex;
108 import com.sleepycat.je.utilint.FileStoreInfo;
109 import com.sleepycat.je.utilint.LoggerUtils;
110 import com.sleepycat.je.utilint.StoppableThread;
111 import com.sleepycat.je.utilint.TestHook;
112 import com.sleepycat.je.utilint.TestHookExecute;
113 import com.sleepycat.je.utilint.VLSN;
114 
115 /**
116  * Represents a replication node. This class is the locus of operations that
117  * manage the state of the node, master, replica, etc. Once the state of a node
118  * has been established the thread of control passes over to the Replica or
119  * FeederManager instances.
120  *
121  * Note that both Feeders and the Replica instance may be active in future when
122  * we support r2r replication, in addition to m2r replication. For now however,
123  * either the FeederManager is active, or the Replica is and the same common
124  * thread control can be shared between the two.
125  */
126 public class RepNode extends StoppableThread {
127 
128     /*
129      * The unique node name and internal id that identifies the node within
130      * the rep group. There is a canonical instance of this that's updated
131      * when the node joins the group.
132      */
133     private final NameIdPair nameIdPair;
134 
135     /*
136      * The socket address on which Replicas connect to me, were this node
137      * to become the master.
138      */
139     private final InetSocketAddress mySocket;
140 
141     /* The service dispatcher used by this replication node. */
142     private final ServiceDispatcher serviceDispatcher;
143 
144     /* The election instance for this node */
145     private Elections elections;
146 
147     /* The locus of operations when the node is a replica. */
148     private final Replica replica;
149 
150     /* Used when the node is a feeder. */
151     private FeederManager feederManager;
152 
153     /*
154      * The status of the Master. Note that this is the leading state as
155      * communicated to this node via the Listener. The node itself may not as
156      * yet have responded to this state change announced by the Listener. That
157      * is, nodeState, may reflect a different state until the transition to
158      * this state has been completed.
159      */
160     private final MasterStatus masterStatus;
161     private final MasterChangeListener changeListener;
162     private final MasterSuggestionGenerator suggestionGenerator;
163 
164     /*
165      * Represents the application visible state of this node. It may lag the
166      * state as described by masterStatus.
167      */
168     private final NodeState nodeState;
169 
170     private final RepImpl repImpl;
171 
172     /* The encapsulated internal replication group database. */
173     final RepGroupDB repGroupDB;
174 
175     /*
176      * The latch used to indicate that the node has a well defined state as a
177      * Master or Replica and has finished the node-specific initialization that
178      * will permit it to function immediately in that capacity.
179      *
180      * For a Master it means that it's ready to start accepting connections
181      * from Replicas.
182      *
183      * For a Replica, it means that it has established a connection with a
184      * Feeder, completed the handshake process that validates it as being a
185      * legitimate member of the group, established a sync point, and is ready
186      * to start replaying the replication stream.
187      */
188     private volatile ExceptionAwareCountDownLatch readyLatch = null;
189 
190     /*
191      * Latch used to freeze txn commit VLSN advancement during an election.
192      */
193     private final CommitFreezeLatch vlsnFreezeLatch = new CommitFreezeLatch();
194 
195     /*
196      * Describes the nodes that form the group. This information is dynamic
197      * it's initialized at startup and subsequently as a result of changes
198      * made either directly to it, when the node is a master, or via the
199      * replication stream, when it is a Replica.
200      */
201     volatile private RepGroupImpl group;
202 
203     /*
204      * Determines the election policy to use when the node holds its very first
205      * elections
206      */
207     private QuorumPolicy electionQuorumPolicy = QuorumPolicy.SIMPLE_MAJORITY;
208 
209     /*
210      * Amount of times to sleep between retries when a new node tries to locate
211      * a master.
212      */
213     private static final int MASTER_QUERY_INTERVAL = 10000;
214 
215     /* Number of times to retry joining on a retryable exception. */
216     private static final int JOIN_RETRIES = 10;
217 
218     /*
219      * Encapsulates access to current time, to arrange for testing of clock
220      * skews.
221      */
222     private final Clock clock;
223 
224     private com.sleepycat.je.rep.impl.networkRestore.FeederManager
225         logFeederManager;
226     private LDiffService ldiff;
227     private NodeStateService nodeStateService;
228     private BinaryNodeStateService binaryNodeStateService;
229     private GroupService groupService;
230 
231     /* tracks the local CBVLSN for this node. */
232     final LocalCBVLSNTracker cbvlsnTracker;
233 
234     /* The currently in-progress Master Transfer operation, if any. */
235     private MasterTransfer xfrInProgress;
236 
237     /* calculates and manages the global, cached CBVLSN */
238     final GlobalCBVLSN globalCBVLSN;
239 
240     /* Determines how long to wait for a replica to catch up on a close. */
241     private long replicaCloseCatchupMs = -1;
242 
243     /* Manage and notify MonitorChangeEvents fired by this RepNode. */
244     private MonitorEventManager monitorEventManager;
245 
246     /* The user defined AppStateMonitor which gets the application state. */
247     private AppStateMonitor appStateMonitor;
248 
249     /* A timer used to track inactive socket channels used by the RepNode. */
250     private final Timer timer;
251     private final ChannelTimeoutTask channelTimeoutTask;
252     private LogFlusher logFlusher;
253 
254     final Logger logger;
255 
256     /* Locus of election and durability quorum decisions */
257     private final ElectionQuorum electionQuorum;
258     private final DurabilityQuorum durabilityQuorum;
259 
260     private final Arbiter arbiter;
261     private final NodeType nodeType;
262 
263     /** Manages the allocation of node IDs for secondary nodes. */
264     private final SecondaryNodeIds secondaryNodeIds =
265         new SecondaryNodeIds(RepGroupImpl.MAX_SECONDARY_NODES);
266 
267     /**
268      * Synchronize on this object when setting the minimum JE version or adding
269      * a secondary node, which could change the JE versions of the nodes to
270      * check when setting a new minimum.
271      *
272      * @see #setMinJEVersion
273      * @see #addSecondaryNode
274      */
275     private final Object minJEVersionLock = new Object();
276 
277     /**
278      * The relative cost of replay as compared with network restore, as
279      * specified by
280      * {@link com.sleepycat.je.rep.ReplicationConfig#REPLAY_COST_PERCENT}.
281      */
282     private final int replayCostPercent;
283 
284     /**
285      * The percentage of free disk space to maintain when choosing files to
286      * retain based on replay cost, as specified by {@link
287      * com.sleepycat.je.rep.ReplicationConfig#REPLAY_FREE_DISK_PERCENT}.
288      */
289     private final int replayFreeDiskPercent;
290 
291     /**
292      * The minimum VLSN that should be retained as requested by
293      * replayCostPercent and replayFreeDiskPercent, or NULL_VLSN if disabled.
294      */
295     private volatile VLSN replayCostMinVLSN = VLSN.NULL_VLSN;
296 
297     /* Used by tests only. */
298     private int logVersion = LogEntryType.LOG_VERSION;
299 
300     /* For unit testing */
301     private Set<TestHook<Integer>> convertHooks;
302 
RepNode(RepImpl repImpl, Replay replay, NodeState nodeState)303     public RepNode(RepImpl repImpl,
304                    Replay replay,
305                    NodeState nodeState)
306         throws IOException, DatabaseException {
307 
308         super(repImpl, "RepNode " + repImpl.getNameIdPair());
309 
310         this.repImpl = repImpl;
311         readyLatch = new ExceptionAwareCountDownLatch(repImpl, 1);
312         nameIdPair = repImpl.getNameIdPair();
313         logger = LoggerUtils.getLogger(getClass());
314 
315         this.mySocket = repImpl.getSocket();
316         this.serviceDispatcher =
317             new ServiceDispatcher(mySocket, repImpl,
318                                   repImpl.getChannelFactory());
319         serviceDispatcher.start();
320         clock = new Clock(RepImpl.getClockSkewMs());
321         this.repGroupDB = new RepGroupDB(repImpl);
322 
323         masterStatus = new MasterStatus(nameIdPair);
324         replica = ReplicaFactory.create(this, replay);
325 
326         feederManager = new FeederManager(this);
327         changeListener = new MasterChangeListener(this);
328         suggestionGenerator = new MasterSuggestionGenerator(this);
329 
330         this.nodeState = nodeState;
331 
332         electionQuorum = new ElectionQuorum(repImpl);
333         durabilityQuorum = new DurabilityQuorum(repImpl);
334 
335         utilityServicesStart();
336         this.cbvlsnTracker = new LocalCBVLSNTracker(this);
337         this.globalCBVLSN = new GlobalCBVLSN(this);
338         this.monitorEventManager = new MonitorEventManager(this);
339         timer = new Timer(true);
340         channelTimeoutTask = new ChannelTimeoutTask(timer);
341         configLogFlusher(getConfigManager());
342 
343         arbiter = new Arbiter(repImpl);
344         nodeType = NodeType.valueOf(getConfigManager().get(NODE_TYPE));
345         replayCostPercent = getConfigManager().getInt(REPLAY_COST_PERCENT);
346         replayFreeDiskPercent = getReplayFreeDiskPercentParameter();
347     }
348 
utilityServicesStart()349     private void utilityServicesStart() {
350         ldiff = new LDiffService(serviceDispatcher, repImpl);
351         logFeederManager =
352             new com.sleepycat.je.rep.impl.networkRestore.FeederManager
353             (serviceDispatcher, repImpl, nameIdPair);
354 
355         /* Register the node state querying service. */
356         nodeStateService = new NodeStateService(serviceDispatcher, this);
357         serviceDispatcher.register(nodeStateService);
358 
359         binaryNodeStateService =
360             new BinaryNodeStateService(serviceDispatcher, this);
361         groupService = new GroupService(serviceDispatcher, this);
362         serviceDispatcher.register(groupService);
363     }
364 
getReplayFreeDiskPercentParameter()365     private int getReplayFreeDiskPercentParameter() {
366         final int value = getConfigManager().getInt(REPLAY_FREE_DISK_PERCENT);
367         if (value == 0) {
368             return 0;
369         }
370         try {
371             FileStoreInfo.checkSupported();
372             return value;
373         } catch (UnsupportedOperationException e) {
374             LoggerUtils.warning(
375                 logger, repImpl,
376                 "The " + REPLAY_FREE_DISK_PERCENT.getName() +
377                 " parameter was specified, but is not supported: " +
378                 e.getMessage());
379             return 0;
380         }
381     }
382 
383     /* Create a placeholder node, for test purposes only. */
RepNode(NameIdPair nameIdPair)384     public RepNode(NameIdPair nameIdPair) {
385         this(nameIdPair, null);
386     }
387 
RepNode()388     public RepNode() {
389         this(NameIdPair.NULL);
390     }
391 
RepNode(NameIdPair nameIdPair, ServiceDispatcher serviceDispatcher)392     public RepNode(NameIdPair nameIdPair,
393                    ServiceDispatcher serviceDispatcher) {
394         super("RepNode " + nameIdPair);
395         repImpl = null;
396         clock = new Clock(0);
397 
398         this.nameIdPair = nameIdPair;
399         mySocket = null;
400         this.serviceDispatcher = serviceDispatcher;
401 
402         this.repGroupDB = null;
403 
404         masterStatus = new MasterStatus(NameIdPair.NULL);
405         replica = null;
406         feederManager = null;
407         changeListener = null;
408         suggestionGenerator = null;
409         nodeState = null;
410         cbvlsnTracker = null;
411         globalCBVLSN = null;
412         logger = null;
413         timer = null;
414         channelTimeoutTask = null;
415         electionQuorum = null;
416         durabilityQuorum = null;
417         arbiter = null;
418         nodeType = NodeType.ELECTABLE;
419         replayCostPercent = 0;
420         replayFreeDiskPercent = 0;
421     }
422 
423     @Override
getLogger()424     public Logger getLogger() {
425         return logger;
426     }
427 
428     /**
429      * Returns the node type of this node.
430      */
getNodeType()431     public NodeType getNodeType() {
432         return nodeType;
433     }
434 
435     /**
436      * Returns the timer associated with this RepNode
437      */
getTimer()438     public Timer getTimer() {
439         return timer;
440     }
441 
getServiceDispatcher()442     public ServiceDispatcher getServiceDispatcher() {
443         return serviceDispatcher;
444     }
445 
446     /**
447      * Returns the accumulated statistics for this node. The method
448      * encapsulates the statistics associated with its two principal components
449      * the FeederManager and the Replica.
450      */
getStats(StatsConfig config)451     public ReplicatedEnvironmentStats getStats(StatsConfig config) {
452         return RepInternal.makeReplicatedEnvironmentStats(repImpl, config);
453     }
454 
resetStats()455     public void resetStats() {
456         feederManager.resetStats();
457         replica.resetStats();
458     }
459 
getReadyLatch()460     public ExceptionAwareCountDownLatch getReadyLatch() {
461         return readyLatch;
462     }
463 
getVLSNFreezeLatch()464     public CommitFreezeLatch getVLSNFreezeLatch() {
465         return vlsnFreezeLatch;
466     }
467 
resetReadyLatch(Exception exception)468     public void resetReadyLatch(Exception exception) {
469         ExceptionAwareCountDownLatch old = readyLatch;
470         readyLatch = new ExceptionAwareCountDownLatch(repImpl, 1);
471         if (old.getCount() != 0) {
472             /* releasing latch in some error situation. */
473             old.releaseAwait(exception);
474         }
475     }
476 
477     /* The methods below return the components of the rep node. */
feederManager()478     public FeederManager feederManager() {
479         return feederManager;
480     }
481 
replica()482     public Replica replica() {
483         return replica;
484     }
485 
getClock()486     public Clock getClock() {
487         return clock;
488     }
489 
getReplica()490     public Replica getReplica() {
491         return replica;
492     }
493 
getRepGroupDB()494     public RepGroupDB getRepGroupDB() {
495         return repGroupDB;
496     }
497 
498     /**
499      * Retrieves the node's current snapshot image of the group definition.
500      * <p>
501      * There is a very brief period of time, during node start-up, where this
502      * can be <code>null</code>.  But after that it should always return a
503      * valid object.
504      */
getGroup()505     public RepGroupImpl getGroup() {
506         return group;
507     }
508 
509     /**
510      * Returns the UUID associated with the replicated environment.
511      */
getUUID()512     public UUID getUUID() {
513         if (group == null) {
514             throw EnvironmentFailureException.unexpectedState
515                 ("Group info is not available");
516         }
517         return group.getUUID();
518     }
519 
520     /**
521      * Returns the nodeName associated with this replication node.
522      *
523      * @return the nodeName
524      */
getNodeName()525     public String getNodeName() {
526         return nameIdPair.getName();
527     }
528 
529     /**
530      * Returns the nodeId associated with this replication node.
531      *
532      * @return the nodeId
533      */
getNodeId()534     public int getNodeId() {
535         return nameIdPair.getId();
536     }
537 
getNameIdPair()538     public NameIdPair getNameIdPair() {
539         return nameIdPair;
540     }
541 
getSocket()542     public InetSocketAddress getSocket() {
543         return mySocket;
544     }
545 
getMasterStatus()546     public MasterStatus getMasterStatus() {
547         return masterStatus;
548     }
549 
550     /**
551      * Returns a definitive answer to whether this node is currently the master
552      * by checking both its status as a master and whether the group agrees
553      * that it is the master.
554      *
555      * Such an authoritative answer is needed in a network partition situation
556      * to detect a master that may be isolated on the minority side of a
557      * network partition.
558      *
559      * @return true if the node is definitely the master. False if it's not or
560      * we cannot be sure.
561      */
isAuthoritativeMaster()562     public boolean isAuthoritativeMaster() {
563         return (electionQuorum.isAuthoritativeMaster(getMasterStatus(),
564                                                      feederManager));
565     }
566 
getHeartbeatInterval()567     public int getHeartbeatInterval() {
568         return getConfigManager().getInt(HEARTBEAT_INTERVAL);
569     }
570 
571     /* For unit testing only. */
setVersion(int version)572     public void setVersion(int version) {
573         logVersion = version;
574     }
575 
getLogVersion()576     public int getLogVersion() {
577         return logVersion;
578     }
579 
getElectionPriority()580     public int getElectionPriority() {
581         final int priority =
582             getConfigManager().getInt(RepParams.NODE_PRIORITY);
583         final int defaultPriority =
584             Integer.parseInt(RepParams.NODE_PRIORITY.getDefault());
585         return (getConfigManager().getBoolean(RepParams.DESIGNATED_PRIMARY) &&
586                 (priority == defaultPriority)) ?
587             defaultPriority + 1 : /* Raise its priority. */
588             priority; /* Explicit priority, leave it intact. */
589     }
590 
591     /*
592      * Amount of time to wait for a thread to finish on a shutdown. It's
593      * a multiple of a heartbeat, since a thread typically polls for a
594      * shutdown once per heartbeat.
595      */
getThreadWaitInterval()596     public int getThreadWaitInterval() {
597         return getHeartbeatInterval()*4;
598     }
599 
getDbTreeCacheClearingOpCount()600     int getDbTreeCacheClearingOpCount() {
601         return getConfigManager().getInt(DBTREE_CACHE_CLEAR_COUNT);
602     }
603 
getRepImpl()604     public RepImpl getRepImpl() {
605         return repImpl;
606     }
607 
getLogManager()608     public LogManager getLogManager() {
609         return repImpl.getLogManager();
610     }
611 
getConfigManager()612     DbConfigManager getConfigManager() {
613         return repImpl.getConfigManager();
614     }
615 
getVLSNIndex()616     public VLSNIndex getVLSNIndex() {
617         return repImpl.getVLSNIndex();
618     }
619 
getFeederTxns()620     public FeederTxns getFeederTxns() {
621         return repImpl.getFeederTxns();
622     }
623 
getElections()624     public Elections getElections() {
625         return elections;
626     }
627 
getSuggestionGenerator()628     public MasterSuggestionGenerator getSuggestionGenerator() {
629         return suggestionGenerator;
630     }
631 
632     /* Used by unit tests only. */
getElectionPolicy()633     public QuorumPolicy getElectionPolicy() {
634         return electionQuorumPolicy;
635     }
636 
637     /**
638      * Returns an array of nodes suitable for feeding log files for a network
639      * restore.
640      *
641      * @return an array of feeder nodes
642      */
getLogProviders()643     public RepNodeImpl[] getLogProviders() {
644         final Set<RepNodeImpl> nodes = getGroup().getDataMembers();
645         return nodes.toArray(new RepNodeImpl[nodes.size()]);
646     }
647 
648     /* Used by unit tests only. */
getLogFlusher()649     public LogFlusher getLogFlusher() {
650         return logFlusher;
651     }
652 
653     /* Configure the log flusher according to the configuration changes. */
configLogFlusher(DbConfigManager configMgr)654     public void configLogFlusher(DbConfigManager configMgr) {
655         boolean enableTask = configMgr.getBoolean(RUN_LOG_FLUSH_TASK);
656         int flushInterval = configMgr.getDuration(LOG_FLUSH_TASK_INTERVAL);
657 
658         /* Cancel the log flushing the task if we want to. */
659         if (!enableTask) {
660             if (logFlusher != null) {
661                 logFlusher.cancelTask();
662             }
663 
664             return;
665         }
666 
667         /* Create LogFlusher if it's null and we do want to start the task. */
668         if (logFlusher == null) {
669             logFlusher = new LogFlusher(this, timer);
670         }
671 
672         /* Configure the flushing task. */
673         logFlusher.configFlushTask(flushInterval);
674     }
675 
getChannelTimeoutTask()676     public ChannelTimeoutTask getChannelTimeoutTask() {
677         return channelTimeoutTask;
678     }
679 
isMaster()680     public boolean isMaster() {
681         return masterStatus.isNodeMaster();
682     }
683 
getMonitorEventManager()684     public MonitorEventManager getMonitorEventManager() {
685         return monitorEventManager;
686     }
687 
688     /**
689      * Register an AppStateMonitor with this RepNode.
690      */
registerAppStateMonitor(AppStateMonitor stateMonitor)691     public void registerAppStateMonitor(AppStateMonitor stateMonitor) {
692         this.appStateMonitor = stateMonitor;
693     }
694 
695     /**
696      * Return the application state that defined in user specified
697      * AppStateMonitor.
698      */
getAppState()699     public byte[] getAppState() {
700 
701         /*
702          * If the AppStateMonitor is not defined, or there is currently no
703          * returned application state, return null.
704          */
705         if (appStateMonitor == null || appStateMonitor.getAppState() == null) {
706             return null;
707         }
708 
709         /* Application state shouldn't be a zero length byte array. */
710         if (appStateMonitor.getAppState().length == 0) {
711             throw new IllegalStateException
712                 ("Application state should be a byte array larger than 0.");
713         }
714 
715         return appStateMonitor.getAppState();
716     }
717 
718     /* Get the current master name if it exists. */
getMasterName()719     public String getMasterName() {
720         if (masterStatus.getGroupMasterNameId().getId() ==
721             NameIdPair.NULL_NODE_ID) {
722             return null;
723         }
724 
725         return masterStatus.getGroupMasterNameId().getName();
726     }
727 
728     /**
729      * Returns the latest VLSN associated with a replicated commit. Note that
730      * since the lastTxnEndVLSN is computed outside the write log latch, via
731      * EnvironmentImpl.registerVLSN(LogItem) it's possible for it to be behind
732      * on an instantaneous basis, but it will eventually catch up when the
733      * updates quiesce.
734      */
getCurrentTxnEndVLSN()735     public VLSN getCurrentTxnEndVLSN() {
736         return repImpl.getLastTxnEnd();
737     }
738 
739     /*
740      * Testing API used to force this node as a master. The mastership is
741      * communicated upon election completion via the Listener. It's the
742      * responsibility of the caller to ensure that only one node is forced
743      * at a time via this API.
744      *
745      * @param force true to force this node as the master, false reverts back
746      *              to use of normal (non-preemptive) elections.
747      */
forceMaster(boolean force)748     public void forceMaster(boolean force)
749         throws InterruptedException, DatabaseException {
750 
751         suggestionGenerator.forceMaster(force);
752         /* Initiate elections to make the changed proposal heard. */
753         refreshCachedGroup();
754         elections.initiateElection(group, electionQuorumPolicy);
755     }
756 
757     /**
758      * Starts up the thread in which the node does its processing as a master
759      * or replica. It then waits for the newly started thread to transition it
760      * out of the DETACHED state, and returns upon completion of this
761      * transition.
762      *
763      * @throws DatabaseException
764      */
startup(QuorumPolicy initialElectionPolicy)765     private void startup(QuorumPolicy initialElectionPolicy)
766         throws DatabaseException {
767 
768         if (isAlive()) {
769             return;
770         }
771 
772         if (nodeState.getRepEnvState().isDetached()) {
773             nodeState.changeAndNotify(UNKNOWN, NameIdPair.NULL);
774         }
775         elections = new Elections(this,
776                                   changeListener,
777                                   suggestionGenerator);
778 
779         repImpl.getStartupTracker().start(Phase.FIND_MASTER);
780         try {
781 
782             if (repImpl.getConfigManager().
783                 getBoolean(RepParams.RESET_REP_GROUP)) {
784                 /* Invoked by DbResetRepGroup utility */
785                 reinitSelfElect();
786             } else {
787                 findMaster();
788             }
789             this.electionQuorumPolicy = initialElectionPolicy;
790 
791             /* Electable members should participate in elections */
792             if (electionQuorum.nodeTypeParticipates(nodeType)) {
793                 elections.participate();
794             }
795         } finally {
796             repImpl.getStartupTracker().stop(Phase.FIND_MASTER);
797         }
798 
799         start();
800     }
801 
802     /**
803      * This method must be invoked when a RepNode is first initialized and
804      * subsequently every time there is a change to the replication group.
805      * <p>
806      * The Master should invoke this method each time a member is added or
807      * removed, and a replica should invoke it each time it detects the commit
808      * of a transaction that modifies the membership database.
809      * <p>
810      * In addition, it must be invoked after a syncup operation, since it may
811      * revert changes made to the membership table.
812      *
813      * @throws DatabaseException
814      */
refreshCachedGroup()815     public RepGroupImpl refreshCachedGroup()
816         throws DatabaseException {
817 
818         group = repGroupDB.getGroup();
819         elections.updateRepGroup(group);
820         if (nameIdPair.hasNullId()) {
821             RepNodeImpl n = group.getMember(nameIdPair.getName());
822             if (n != null) {
823 
824                 /*
825                  * Don't update the node ID for a secondary node if
826                  * IGNORE_SECONDARY_NODE_ID is true.  In that case, we are
827                  * trying to convert a previously electable node to a secondary
828                  * node, so the information about the electable node ID in the
829                  * local copy of the rep group DB should be ignored.
830                  */
831                 if (!nodeType.isSecondary() ||
832                     !getConfigManager().getBoolean(IGNORE_SECONDARY_NODE_ID)) {
833                     /* May not be sufficiently current in the rep stream. */
834                     nameIdPair.update(n.getNameIdPair());
835                 }
836             }
837         }
838         return group;
839     }
840 
841     /**
842      * Removes a node so that it's no longer a member of the group.
843      *
844      * Note that names referring to removed nodes cannot be reused.
845      *
846      * @param nodeName identifies the node to be removed
847      *
848      * @throws MemberNotFoundException if the node denoted by
849      * <code>memberName</code> is not a member of the replication group.
850      *
851      * @throws MasterStateException if the member being removed is currently
852      * the Master
853      *
854      * @see <a href="https://sleepycat.oracle.com/trac/wiki/DynamicGroupMembership#DeletingMembers">Member Deletion</a>
855      */
removeMember(String nodeName)856     public void removeMember(String nodeName) {
857         removeMember(nodeName, false);
858     }
859 
860     /**
861      * Remove or delete a node from the group.  If deleting a node, the node
862      * must not be active.
863      *
864      * <p>Note that names referring to removed nodes cannot be reused, but
865      * names for deleted nodes can be.
866      *
867      * @param nodeName identifies the node to be removed or deleted
868      *
869      * @param delete whether to delete the node rather than just remove it
870      *
871      * @throws MemberActiveException if {@code delete} is {@code true} and
872      * the node is currently active
873      *
874      * @throws MemberNotFoundException if the node denoted by
875      * <code>memberName</code> is not a member of the replication group.
876      *
877      * @throws MasterStateException if the member being removed or deleted is
878      * currently the Master
879      */
removeMember(String nodeName, boolean delete)880     public void removeMember(String nodeName, boolean delete) {
881         checkValidity(
882             nodeName, delete ? "Deleting member" : "Removing member");
883 
884         if (delete && feederManager.activeReplicas().contains(nodeName)) {
885             throw new MemberActiveException(
886                 "Attempt to delete an active node: " + nodeName);
887         }
888 
889         /*
890          * First remove it from the cached group, effectively setting new
891          * durability requirements, for the ensuing group db updates.
892          */
893         RepNodeImpl node = group.removeMember(nodeName, delete);
894 
895         /*
896          * Shutdown any feeder that may be active with the replica. Unless
897          * deleting, any subsequent attempts by the replica to rejoin the group
898          * will result in a failure.
899          */
900         feederManager.shutdownFeeder(node);
901         repGroupDB.removeMember(node, delete);
902     }
903 
904     /**
905      * Update the network address of a node.
906      *
907      * Note that an alive node's address can't be updated, we'll throw an
908      * ReplicaStateException for this case.
909      *
910      * @param nodeName identifies the node to be updated
911      * @param newHostName the new host name of this node
912      * @param newPort the new port of this node
913      */
updateAddress(String nodeName, String newHostName, int newPort)914     public void updateAddress(String nodeName,
915                               String newHostName,
916                               int newPort) {
917         final RepNodeImpl node =
918             checkValidity(nodeName, "Updating node's address");
919 
920         /* Check whether the node is still alive. */
921         if (feederManager.getFeeder(nodeName) != null) {
922             throw new ReplicaStateException
923                 ("Can't update the network address for a live node.");
924         }
925 
926         /* Update the node information in the group database. */
927         node.setHostName(newHostName);
928         node.setPort(newPort);
929         node.setQuorumAck(false);
930         repGroupDB.updateMember(node, true);
931     }
932 
933     /**
934      * Transfer the master role to one of the specified replicas.
935      * <p>
936      * We delegate most of the real work to an instance of the {@link
937      * MasterTransfer} class.  Here, after some simple initial validity
938      * checking, we're concerned with coordinating the potential for multiple
939      * overlapping Master Transfer operation attempts.  The possible outcomes
940      * are:
941      * <ol>
942      * <li>complete success ({@code done == true})
943      * <ul>
944      * <li>
945      * don't unblock txns here; that'll happen automatically as part of the
946      * usual handling when the environment transitions from master->replica
947      * state.
948      * <li>
949      * don't clear xfrInProgress, because we don't want to allow another
950      * attempt to supersede
951      * </ul>
952      * <li>timeout before establishing a winner (no superseder)
953      * <ul>
954      * <li>unblock txns
955      * <li>clear xfrInProgress
956      * </ul>
957      * <li>superseded (see {@link #setUpTransfer})
958      * <ul>
959      * <li>abort existing op (if permitted), unblock txns before unleashing the
960      * new one
961      * <li>replace xfrInProgress
962      * </ul>
963      * <li>env is closed (or invalidated because of an error) during the
964      * operation
965      * <ul>
966      * <li>release the block
967      * <li>leave xfrInProgress as is.
968      * </ul>
969      * </ol>
970      *
971      * @param replicas candidate targets for new master role
972      * @param timeout time limit, in msec
973      * @param force whether to replace any existing, in-progress
974      * transfer operation
975      */
transferMaster(Set<String> replicas, long timeout, boolean force)976     public String transferMaster(Set<String> replicas,
977                                  long timeout,
978                                  boolean force) {
979         if (replicas == null || replicas.isEmpty()) {
980             throw new IllegalArgumentException
981                 ("Parameter 'replicas' cannot be null or empty");
982         }
983         if (!nodeState.getRepEnvState().isMaster()) {
984             throw new IllegalStateException("Not currently master");
985         }
986         if (replicas.contains(getNodeName())) {
987 
988             /*
989              * The local node is on the list of candidate new masters, and
990              * we're already master: the operation is trivially satisfied.
991              */
992             return getNodeName();
993         }
994         for (String rep : replicas) {
995             RepNodeImpl node = group.getNode(rep);
996             if (node == null || node.isRemoved()) {
997                 throw new IllegalArgumentException
998                     ("Node '" + rep +
999                      "' is not currently an active member of the group");
1000             } else if (!node.getType().isElectable()) {
1001                 throw new IllegalArgumentException
1002                     ("Node '" + rep +
1003                      "' must have node type ELECTABLE, but had type " +
1004                      node.getType());
1005             }
1006         }
1007 
1008         MasterTransfer xfr = setUpTransfer(replicas, timeout, force);
1009         boolean done = false;
1010         try {
1011             String winner = xfr.transfer();
1012             done = true;
1013             return winner;
1014         } finally {
1015             synchronized (this) {
1016                 if (xfrInProgress == xfr && !done) {
1017                     xfrInProgress = null;
1018                 }
1019             }
1020         }
1021     }
1022 
1023     /**
1024      * Sets up a Master Transfer operation, ensuring that only one operation
1025      * can be in progress at a time.
1026      */
setUpTransfer(Set<String> replicas, long timeout, boolean force)1027     synchronized private MasterTransfer setUpTransfer(Set<String> replicas,
1028                                                       long timeout,
1029                                                       boolean force) {
1030         boolean reject = false; // initial guess, refine below if nec.
1031         if (xfrInProgress != null) {
1032             reject = true;      // next best guess, refine below again if nec.
1033 
1034             /*
1035              * If the new operation is "forcing", see if we can abort the
1036              * existing one.
1037              */
1038             if (force &&
1039                 xfrInProgress.abort
1040                 (new MasterTransferFailureException("superseded"))) {
1041                 reject = false;
1042 
1043                 repImpl.unblockTxnCompletion();
1044             }
1045         }
1046         if (reject) {
1047             throw new MasterTransferFailureException
1048                 ("another Master Transfer (started at " +
1049                  new Date(xfrInProgress.getStartTime()) +
1050                  ") is already in progress");
1051         }
1052         xfrInProgress = new MasterTransfer(replicas, timeout, this);
1053         return xfrInProgress;
1054     }
1055 
getActiveTransfer()1056     public MasterTransfer getActiveTransfer() {
1057         return xfrInProgress;
1058     }
1059 
1060     /**
1061      * Called by the RepNode when a transition to replica status has completely
1062      * finished.
1063      */
clearActiveTransfer()1064     public synchronized void clearActiveTransfer() {
1065         xfrInProgress = null;
1066     }
1067 
1068     /**
1069      * Performs some basic validity checking, common code for some
1070      * Group Membership operations.
1071      *
1072      * @param nodeName name of a replica node on which an operation is
1073      * to be performed
1074      * @param actionName textual description of the operation (for
1075      * exception message)
1076      * @return the named node
1077      */
checkValidity(String nodeName, String actionName)1078     private RepNodeImpl checkValidity(String nodeName, String actionName)
1079         throws MemberNotFoundException {
1080 
1081         if (!nodeState.getRepEnvState().isMaster()) {
1082             throw EnvironmentFailureException.unexpectedState
1083                 ("Not currently a master. " + actionName + " must be " +
1084                  "invoked on the node that's currently the master.");
1085         }
1086 
1087         final RepNodeImpl node = group.getNode(nodeName);
1088         if (node == null) {
1089             throw new MemberNotFoundException("Node:" + nodeName +
1090                                               "is not a member of the group:" +
1091                                               group.getName());
1092         }
1093 
1094         if (node.isRemoved() && node.isQuorumAck()) {
1095             throw new MemberNotFoundException("Node:" + nodeName +
1096                                               "is not currently a member of " +
1097                                               "the group:" + group.getName() +
1098                                               " It had been removed.");
1099         }
1100 
1101         /* Check if the node is the master itself. */
1102         if (nodeName.equals(getNodeName())) {
1103             throw new MasterStateException(getRepImpl().
1104                                            getStateChangeEvent());
1105         }
1106 
1107         return node;
1108     }
1109 
1110     /**
1111      * Updates the cached group info for the node, avoiding a database read.
1112      *
1113      * @param updateNameIdPair the node whose localCBVLSN must be updated.
1114      * @param barrierState the new node syncup state
1115      */
updateGroupInfo(NameIdPair updateNameIdPair, RepGroupImpl.BarrierState barrierState)1116     public void updateGroupInfo(NameIdPair updateNameIdPair,
1117                                 RepGroupImpl.BarrierState barrierState) {
1118 
1119         RepNodeImpl node = group.getMember(updateNameIdPair.getName());
1120         if (node == null) {
1121             /*  A subsequent refresh will get it, along with the new node. */
1122             return;
1123         }
1124 
1125         LoggerUtils.fine(logger, repImpl,
1126                          "LocalCBVLSN for " + updateNameIdPair +
1127                          " updated to " + barrierState +
1128                          " from " + node.getBarrierState().getLastCBVLSN());
1129         node.setBarrierState(barrierState);
1130         globalCBVLSN.recalculate(group);
1131     }
1132 
1133     /**
1134      * Recalculate the Global CBVLSN, provoked by Replay, to ensure that the
1135      * replica's global CBVLSN is up to date.
1136      */
recalculateGlobalCBVLSN()1137     void recalculateGlobalCBVLSN() {
1138         globalCBVLSN.recalculate(group);
1139     }
1140 
getCBVLSNTracker()1141     LocalCBVLSNTracker getCBVLSNTracker() {
1142         return cbvlsnTracker;
1143     }
1144 
freezeLocalCBVLSN()1145     public void freezeLocalCBVLSN() {
1146         cbvlsnTracker.incrementFreezeCounter();
1147     }
1148 
unfreezeLocalCBVLSN()1149     public void unfreezeLocalCBVLSN() {
1150         cbvlsnTracker.decrementFreezeCounter();
1151     }
1152 
1153     /**
1154      * Finds a master node.
1155      *
1156      * @throws DatabaseException
1157      */
findMaster()1158     private void findMaster()
1159         throws DatabaseException {
1160 
1161         refreshCachedGroup();
1162         elections.startLearner();
1163         LoggerUtils.info(logger, repImpl, "Current group size: " +
1164                          group.getElectableGroupSize());
1165         final RepNodeImpl thisNode = group.getNode(nameIdPair.getName());
1166         if ((thisNode == null) &&
1167 
1168             /*
1169              * Secondary nodes are not stored in the group DB, so they will not
1170              * be found even though they are not new.  Use group UUID to
1171              * distinguish -- it is only unknown if the node is new.
1172              */
1173             (nodeType.isElectable() || group.hasUnknownUUID())) {
1174 
1175             /* A new node */
1176             LoggerUtils.info(logger, repImpl, "New node " + nameIdPair +
1177                              " unknown to rep group");
1178             Set<InetSocketAddress> helperSockets = repImpl.getHelperSockets();
1179 
1180             /*
1181              * Not present in the replication group. Use the helper, to get
1182              * to a master and enter the group.
1183              */
1184             if ((group.getElectableGroupSize() == 0) &&
1185                 (helperSockets.size() == 1) &&
1186                 nodeType.isElectable() &&
1187                 serviceDispatcher.getSocketAddress().
1188                     equals(helperSockets.iterator().next())) {
1189                 /* A startup situation, should this node become master. */
1190                 selfElect();
1191                 elections.updateRepGroup(group);
1192                 return;
1193             }
1194             try {
1195                 queryGroupForMembership();
1196             } catch (InterruptedException e) {
1197                 throw EnvironmentFailureException.unexpectedException(e);
1198             }
1199         } else if ((thisNode != null) && thisNode.isRemoved()) {
1200             throw EnvironmentFailureException.unexpectedState
1201                 ("Node: " + nameIdPair.getName() +
1202                  " was previously deleted.");
1203         } else {
1204 
1205             /* An existing node */
1206             LoggerUtils.info(logger, repImpl,
1207                              "Existing node " + nameIdPair.getName() +
1208                              " querying for a current master.");
1209 
1210             /*
1211              * The group has other members, see if they know of a master,
1212              * along with any helpers that were also supplied.
1213              */
1214             Set<InetSocketAddress> helperSockets = repImpl.getHelperSockets();
1215             helperSockets.addAll(group.getAllHelperSockets());
1216             elections.getLearner().queryForMaster(helperSockets);
1217         }
1218     }
1219 
1220     /**
1221      * This method enforces the requirement that all addresses within a
1222      * replication group, must be loopback addresses or they must all be
1223      * non-local ip addresses. Mixing them means that the node with a loopback
1224      * address cannot be contacted by a different node.
1225      *
1226      * @param helperSockets the helper nodes used by this node when contacting
1227      * the master.
1228      */
checkLoopbackAddresses(Set<InetSocketAddress> helperSockets)1229     private void checkLoopbackAddresses(Set<InetSocketAddress> helperSockets) {
1230 
1231         final InetAddress myAddress = mySocket.getAddress();
1232         final boolean isLoopback= myAddress.isLoopbackAddress();
1233 
1234         for (InetSocketAddress socketAddress : helperSockets) {
1235             final InetAddress nodeAddress = socketAddress.getAddress();
1236 
1237             if (nodeAddress.isLoopbackAddress() ==  isLoopback) {
1238                 continue;
1239             }
1240             String message = mySocket +
1241                 " the address associated with this node, " +
1242                 (isLoopback? "is " : "is not ") +  "a loopback address." +
1243                 " It conflicts with an existing use, by a different node " +
1244                 " of the address:" +
1245                 socketAddress +
1246                 (!isLoopback ? " which is a loopback address." :
1247                  " which is not a loopback address.") +
1248                 " Such mixing of addresses within a group is not allowed, " +
1249                 "since the nodes will not be able to communicate with " +
1250                 "each other.";
1251             throw new IllegalArgumentException(message);
1252         }
1253     }
1254 
1255     /**
1256      * Communicates with existing nodes in the group in order to figure out how
1257      * to start up, in the case where the local node does not appear to be in
1258      * the (local copy of the) GroupDB, typically because the node is starting
1259      * up with an empty env directory.  It could be that this is a new node
1260      * (never before been part of the group).  Or it could be a pre-existing
1261      * group member that has lost its env dir contents and wants to be restored
1262      * via a Network Restore operation.
1263      * <p>
1264      * We first try to find a currently running master node.  (An authoritative
1265      * master can easily handle either of the above-mentioned situations.)  If
1266      * we can't find a master, we look for other running nodes that may know of
1267      * us (by asking them for their Group information).
1268      * <p>
1269      * We query the designated helpers and all known learners.  The helpers are
1270      * the ones that were identified via the node's configuration, while the
1271      * learners are the ones currently in the member database.  We use both in
1272      * order to cast the widest possible net.
1273      * <p>
1274      * Returns normally when the master is found.
1275      *
1276      * @throws InterruptedException if the current thread is interrupted,
1277      *         typically due to a shutdown
1278      * @throws InsufficientLogException if the environment requires a network
1279      *         restore
1280      * @see #findRestoreSuppliers
1281      */
queryGroupForMembership()1282     private void queryGroupForMembership()
1283         throws InterruptedException {
1284 
1285         Set<InetSocketAddress> helperSockets = repImpl.getHelperSockets();
1286 
1287         checkLoopbackAddresses(helperSockets);
1288 
1289         /*
1290          * Not in the rep group. Use the designated helpers and other members
1291          * of the group to help us figure out how to get started.
1292          */
1293         final Set<InetSocketAddress> helpers =
1294             new HashSet<InetSocketAddress>(helperSockets);
1295         helpers.addAll(group.getAllHelperSockets());
1296         if (helpers.isEmpty()) {
1297             throw EnvironmentFailureException.unexpectedState
1298                 ("Need a helper to add a new node into the group");
1299         }
1300 
1301         NameIdPair groupMasterNameId;
1302         while (true) {
1303             elections.getLearner().queryForMaster(helpers);
1304             groupMasterNameId = masterStatus.getGroupMasterNameId();
1305             if (!groupMasterNameId.hasNullId()) {
1306                 /* A new, or pre-query, group master. */
1307                 if (nameIdPair.hasNullId() &&
1308                     groupMasterNameId.getName().equals(nameIdPair.getName())) {
1309                     /*
1310                      * Residual obsolete information in replicas, ignore it.
1311                      * Can't be master if we don't know our own id, but some
1312                      * other node does! This state means that the node was a
1313                      * master in the recent past, but has had its environment
1314                      * deleted since that time.
1315                      */
1316                     try {
1317                         Thread.sleep(MASTER_QUERY_INTERVAL);
1318                     } catch (InterruptedException e) {
1319                         throw EnvironmentFailureException.unexpectedException(e);
1320                     }
1321                     continue;
1322                 }
1323 
1324                 if (checkGroupMasterIsAlive(groupMasterNameId)) {
1325                     /* Use the current group master if it's alive. */
1326                     break;
1327                 }
1328             }
1329 
1330             /*
1331              * If there's no master, or the last known master cannot be
1332              * reached, see if anyone thinks we're actually already in the
1333              * group, and could supply us with a Network Restore. (Remember,
1334              * we're here only if we didn't find ourselves in the local
1335              * GroupDB. So we could be in a group restore from backup
1336              * situation.)
1337              */
1338             findRestoreSuppliers(helpers);
1339 
1340             Thread.sleep(MASTER_QUERY_INTERVAL);
1341         }
1342         LoggerUtils.info(logger, repImpl, "New node " + nameIdPair.getName() +
1343                          " located master: " + groupMasterNameId);
1344     }
1345 
1346     /**
1347      * Check that the master found by querying other group nodes is indeed
1348      * alive and that we are not dealing with obsolete cached information.
1349      *
1350      * @return true if the master node could be contacted and was truly alive
1351      *
1352      * TODO: handle protocol version mismatch here and in DbPing, also
1353      * consolidate code so that a single copy is shared.
1354      */
checkGroupMasterIsAlive(NameIdPair groupMasterNameId)1355     private boolean checkGroupMasterIsAlive(NameIdPair groupMasterNameId) {
1356 
1357         DataChannel channel = null;
1358 
1359         try {
1360             final InetSocketAddress masterSocket =
1361                 masterStatus.getGroupMaster();
1362 
1363             final BinaryNodeStateProtocol protocol =
1364                 new BinaryNodeStateProtocol(NameIdPair.NOCHECK, null);
1365 
1366             /* Build the connection. Set the parameter connectTimeout.*/
1367             channel = repImpl.getChannelFactory().
1368                 connect(masterSocket,
1369                         new ConnectOptions().
1370                         setTcpNoDelay(true).
1371                         setOpenTimeout(5000).
1372                         setReadTimeout(5000));
1373             ServiceDispatcher.doServiceHandshake
1374                 (channel, BinaryNodeStateService.SERVICE_NAME);
1375 
1376             /* Send a NodeState request to the node. */
1377             protocol.write
1378                 (protocol.new BinaryNodeStateRequest(groupMasterNameId.getName(),
1379                                                      group.getName()),
1380                  channel);
1381 
1382             /* Get the response and return the NodeState. */
1383             BinaryNodeStateResponse response =
1384                 protocol.read(channel, BinaryNodeStateResponse.class);
1385 
1386             ReplicatedEnvironment.State state =  response.getNodeState();
1387            return (state != null) && state.isMaster();
1388         } catch (Exception e) {
1389             LoggerUtils.info(logger, repImpl,
1390                              "Queried master:" + groupMasterNameId +
1391                              " unavailable. Reason:" + e);
1392             return false;
1393         } finally {
1394             if (channel != null) {
1395                 try {
1396                     channel.close();
1397                 } catch (IOException ioe) {
1398                     /* Ignore it */
1399                 }
1400             }
1401         }
1402     }
1403 
1404     /**
1405      * Sets up a Network Restore, as part of the process of restoring an entire
1406      * group from backup, by producing an appropriate {@code
1407      * InsufficientLogException} if possible.
1408      * <p>
1409      * Queries each of the supplied helper hosts for their notion of the group
1410      * make-up.  If any of them consider us to be already in the group, then
1411      * instead of joining the group as a new node we ought to try a Network
1412      * Restore; and the node(s) that do already know of us are the suitable
1413      * suppliers for it.
1414      *
1415      * @throws InsufficientLogException in the successful case, if one or more
1416      * suitable suppliers for a Network Restore can be found; otherwise just
1417      * returns.
1418      */
findRestoreSuppliers(Set<InetSocketAddress> helpers)1419     public void findRestoreSuppliers(Set<InetSocketAddress> helpers) {
1420         final Set<ReplicationNode> suppliers = new HashSet<ReplicationNode>();
1421         RepGroupProtocol protocol =
1422             new RepGroupProtocol(group.getName(), nameIdPair, repImpl,
1423                                  repImpl.getChannelFactory());
1424 
1425         for (InetSocketAddress helper : helpers) {
1426             MessageExchange msg =
1427                 protocol.new MessageExchange(helper,
1428                                              GroupService.SERVICE_NAME,
1429                                              protocol.new GroupRequest());
1430 
1431             /*
1432              * Just as we did in the queryForMaster() case, quietly ignore any
1433              * unsurprising response error or socket exceptions; we'll retry
1434              * later if we end up not finding any Network Restore suppliers.
1435              */
1436             msg.run();
1437             ResponseMessage response = msg.getResponseMessage();
1438             if (response == null ||
1439                 protocol.RGFAIL_RESP.equals(response.getOp())) {
1440                 continue;
1441             } else if (!protocol.GROUP_RESP.equals(response.getOp())) {
1442                 LoggerUtils.warning(logger, repImpl,
1443                                     "Expected GROUP_RESP, got " +
1444                                     response.getOp() + ": " + response);
1445                 continue;
1446             }
1447             GroupResponse groupResp = (GroupResponse) response;
1448 
1449             /*
1450              * If the response from the remote node shows that I am already a
1451              * member of the group, add the node to the list of nodes that will
1452              * serve the Network Restore.
1453              */
1454             RepGroupImpl groupInfo = groupResp.getGroup();
1455             RepNodeImpl me = groupInfo.getNode(nameIdPair.getName());
1456             if (me == null || me.isRemoved() || !me.isQuorumAck()) {
1457                 continue;
1458             }
1459 
1460             ReplicationNode supplier = groupInfo.getMember(helper);
1461             if (supplier != null) {
1462                 suppliers.add(supplier);
1463             }
1464         }
1465 
1466         if (suppliers.isEmpty()) {
1467             return;
1468         }
1469 
1470         throw new InsufficientLogException(this, VLSN.NULL_VLSN, suppliers);
1471     }
1472 
1473     /**
1474      * Elects this node as the master. The operation is only valid when the
1475      * group consists of just this node, and when this is an ELECTABLE node.
1476      *
1477      * @throws DatabaseException
1478      * @throws IllegalStateException if the node type is not ELECTABLE
1479      */
selfElect()1480     private void selfElect()
1481         throws DatabaseException {
1482 
1483         if (!nodeType.isElectable()) {
1484             throw new IllegalStateException(
1485                 "Cannot elect node " + nameIdPair.getName() +
1486                 " as master because its node type, " + nodeType +
1487                 ", is not ELECTABLE");
1488         }
1489         nameIdPair.setId(RepGroupImpl.getFirstNodeId());
1490 
1491         /* Master by default of a nascent group. */
1492         Proposal proposal = new TimebasedProposalGenerator().nextProposal();
1493         elections.getLearner().processResult(proposal,
1494                                              suggestionGenerator.get(proposal));
1495         LoggerUtils.info(logger, repImpl, "Nascent group. " +
1496                          nameIdPair.getName() +
1497                          " is master by virtue of being the first node.");
1498         masterStatus.sync();
1499         nodeState.changeAndNotify(MASTER, masterStatus.getNodeMasterNameId());
1500         repImpl.getVLSNIndex().initAsMaster();
1501         repGroupDB.addFirstNode();
1502         refreshCachedGroup();
1503         /* Unsync so that the run loop does not call for an election. */
1504         masterStatus.unSync();
1505     }
1506 
1507     /**
1508      * Establishes this node as the master, after re-initializing the group
1509      * with this as the sole node in the group. This method is used solely
1510      * as part of the DbResetRepGroup utility.
1511      *
1512      * @throws IllegalStateException if the node type is not ELECTABLE
1513      */
reinitSelfElect()1514     private void reinitSelfElect() {
1515         if (nodeType.isSecondary()) {
1516             throw new IllegalStateException(
1517                 "Cannot elect node " + nameIdPair.getName() +
1518                 " as master because its node type, " + nodeType +
1519                 ", is not ELECTABLE");
1520         }
1521 
1522         /* Establish an empty group so transaction commits can proceed. */
1523         group = repGroupDB.emptyGroup;
1524         LoggerUtils.info(logger, repImpl, "Reinitializing group to node " +
1525                          nameIdPair);
1526 
1527         /*
1528          * Unilaterally transition the nodeState to Master, so that write
1529          * transactions needed to reset the group and establish this node can
1530          * be issued against the environment.
1531          */
1532         nodeState.changeAndNotify(MASTER, masterStatus.getNodeMasterNameId());
1533         repImpl.getVLSNIndex().initAsMaster();
1534 
1535         /*
1536          * Start using new log files. The file ensures that we can safely
1537          * truncate the past VLSNs.
1538          */
1539         repImpl.forceLogFileFlip();
1540 
1541         CheckpointConfig ckptConfig = new CheckpointConfig();
1542         ckptConfig.setForce(true);
1543 
1544         /*
1545          * The checkpoint ensures that we do not have to replay VLSNs from the
1546          * prior group and that we have a complete VLSN index on disk.
1547          */
1548         repImpl.getCheckpointer().doCheckpoint(ckptConfig,
1549                                                "Reinit of RepGroup");
1550         VLSN lastOldVLSN = repImpl.getVLSNIndex().getRange().getLast();
1551 
1552         /* Now create the new rep group on disk. */
1553         repGroupDB.reinitFirstNode(lastOldVLSN);
1554         refreshCachedGroup();
1555 
1556         long lastOldFile =
1557             repImpl.getVLSNIndex().getLTEFileNumber(lastOldVLSN);
1558 
1559         /*
1560          * Discard the VLSN index covering the pre group reset VLSNS, to ensure
1561          * that the pre reset part of the log is never replayed. We don't want
1562          * to replay this part of the log, since it contains references to
1563          * repnodes via node ids that are no longer part of the reset rep
1564          * group. Note that we do not reuse rep node ids, that is, rep node id
1565          * sequence continues across the reset operation and is not itself
1566          * reset. Nodes joining the new group will need to do a network restore
1567          * when they join the group.
1568          *
1569          * Don't perform the truncation if RESET_REP_GROUP_RETAIN_UUID is true.
1570          * In that case, we are only removing the rep group members, but
1571          * retaining the remaining information, because we will be restarting
1572          * the rep group in place with an old secondary acting as an electable
1573          * node.
1574          */
1575         final boolean retainUUID =
1576             getConfigManager().getBoolean(RESET_REP_GROUP_RETAIN_UUID);
1577         if (!retainUUID) {
1578             repImpl.getVLSNIndex().truncateFromHead(lastOldVLSN, lastOldFile);
1579         }
1580 
1581         elections.startLearner();
1582         /* Unsync so that the run loop does not call for an election. */
1583         masterStatus.unSync();
1584     }
1585 
1586     /**
1587      * The top level Master/Feeder or Replica loop in support of replication.
1588      * It's responsible for driving the node level state changes resulting
1589      * from elections initiated either by this node, or by other members of the
1590      * group.
1591      * <p>
1592      * The thread is terminated via an orderly shutdown initiated as a result
1593      * of an interrupt issued by the shutdown() method. Any exception that is
1594      * not handled by the run method itself is caught by the thread's uncaught
1595      * exception handler, and results in the RepImpl being made invalid.  In
1596      * that case, the application is responsible for closing the Replicated
1597      * Environment, which will provoke the shutdown.
1598      * <p>
1599      * Note: This method currently runs either the feeder loop or the replica
1600      * loop. With R to R support, it would be possible for a Replica to run
1601      * both. This will be a future feature.
1602      */
1603     @Override
run()1604     public void run() {
1605         /* Set to indicate an error-initiated shutdown. */
1606         Error repNodeError = null;
1607         try {
1608             LoggerUtils.info(logger, repImpl,
1609                              "Node " + nameIdPair.getName() + " started" +
1610                              (!nodeType.isElectable() ?
1611                               " as " + nodeType :
1612                               ""));
1613             while (!isShutdownOrInvalid()) {
1614                 if (nodeState.getRepEnvState() != UNKNOWN) {
1615                     /* Avoid unnecessary state changes. */
1616                     nodeState.changeAndNotify(UNKNOWN, NameIdPair.NULL);
1617                 }
1618 
1619                 /*
1620                  * Initiate elections if we don't have a group master, or there
1621                  * is a master, but we were unable to use it.
1622                  */
1623                 if (masterStatus.getGroupMasterNameId().hasNullId() ||
1624                     masterStatus.inSync()) {
1625 
1626                     /*
1627                      * But we can't if we don't have our own node ID yet or if
1628                      * we are not ELECTABLE.
1629                      */
1630                     if (nameIdPair.hasNullId() || !nodeType.isElectable()) {
1631                         queryGroupForMembership();
1632                     } else {
1633                         elections.initiateElection(group, electionQuorumPolicy);
1634 
1635                         /*
1636                          * Subsequent elections must always use a simple
1637                          * majority.
1638                          */
1639                         electionQuorumPolicy = QuorumPolicy.SIMPLE_MAJORITY;
1640                     }
1641                     /* In case elections were shut down. */
1642                     if (isShutdownOrInvalid()) {
1643                         return;
1644                     }
1645                 }
1646 
1647                 /* Start syncing this node to the new group master */
1648                 masterStatus.sync();
1649 
1650                 if (masterStatus.isNodeMaster()) {
1651                     repImpl.getVLSNIndex().initAsMaster();
1652                     replica.masterTransitionCleanup();
1653 
1654                     /* Master is ready for business. */
1655                     nodeState.changeAndNotify
1656                             (MASTER, masterStatus.getNodeMasterNameId());
1657 
1658                     /*
1659                      * Update the JE version information stored for the master
1660                      * in the RepGroupDB, if needed.
1661                      */
1662                     maybeUpdateMasterJEVersion();
1663 
1664                     feederManager.runFeeders();
1665 
1666                     /*
1667                      * At this point, the feeder manager has been shutdown.
1668                      * Re-initialize the VLSNIndex put latch mechanism, which
1669                      * is present on masters to maintain a tip cache of the
1670                      * last record on the replication stream, and by all
1671                      * nodes when doing checkpoint vlsn consistency waiting.
1672                      * Create a new feeder manager, should this node become a
1673                      * master later on.
1674                      * Set the node to UNKNOWN state right away, because the
1675                      * MasterTxn will use node state to prevent the advent of
1676                      * any replicated writes.  Once the VLSNIndex is
1677                      * initialized for replica state, the node will NPE if it
1678                      * attempts execute replicated writes.
1679                      */
1680                     nodeState.changeAndNotify(UNKNOWN, NameIdPair.NULL);
1681                     repImpl.getVLSNIndex().initAsReplica();
1682                     assert runConvertHooks();
1683                     feederManager = new FeederManager(this);
1684                 } else {
1685                     /*
1686                      * Replica will notify us when connection is successfully
1687                      * made, and Feeder handshake done, at which point we'll
1688                      * update nodeState.
1689                      */
1690                     replica.replicaTransitionCleanup();
1691                     replica.runReplicaLoop();
1692                 }
1693             }
1694         } catch (InterruptedException e) {
1695             LoggerUtils.fine(logger, repImpl,
1696                              "RepNode main thread interrupted - " +
1697                              " forced shutdown.");
1698         } catch (GroupShutdownException e) {
1699             saveShutdownException(e);
1700             LoggerUtils.fine(logger, repImpl,
1701                              "RepNode main thread sees group shutdown - " + e);
1702         } catch (InsufficientLogException e) {
1703             saveShutdownException(e);
1704         } catch (RuntimeException e) {
1705             LoggerUtils.fine(logger, repImpl,
1706                              "RepNode main thread sees runtime ex - " + e);
1707             saveShutdownException(e);
1708             throw e;
1709         } catch (Error e) {
1710             LoggerUtils.fine(logger, repImpl, e +
1711                              " incurred during repnode loop");
1712             repNodeError = e;
1713             repImpl.invalidate(e);
1714         } finally {
1715             try {
1716                 LoggerUtils.info(logger, repImpl,
1717                                  "RepNode main thread shutting down.");
1718 
1719                 if (repNodeError != null) {
1720                     LoggerUtils.info(logger, repImpl,
1721                                      "Node state at shutdown:\n"+
1722                                      repImpl.dumpState());
1723                     throw repNodeError;
1724                 }
1725                 Throwable exception = getSavedShutdownException();
1726 
1727                 if (exception == null) {
1728                     LoggerUtils.fine(logger, repImpl,
1729                                      "Node state at shutdown:\n"+
1730                                      repImpl.dumpState());
1731                 } else {
1732                     LoggerUtils.info(logger, repImpl,
1733                                      "RepNode shutdown exception:\n" +
1734                                      exception.getMessage() +
1735                                      repImpl.dumpState());
1736                 }
1737 
1738                 try {
1739                     shutdown();
1740                 } catch (DatabaseException e) {
1741                     RepUtils.chainExceptionCause(e, exception);
1742                     LoggerUtils.severe(logger, repImpl,
1743                                        "Unexpected exception during shutdown" +
1744                                        e);
1745                     throw e;
1746                 }
1747             } catch (InterruptedException e1) {
1748                 // Ignore exceptions on exit
1749             }
1750             nodeState.changeAndNotify(DETACHED, NameIdPair.NULL);
1751             cleanup();
1752         }
1753     }
1754 
1755     /**
1756      * Update the information stored for the master in the RepGroupDB if
1757      * storing it is supported and the current version is different from the
1758      * recorded version.
1759      */
maybeUpdateMasterJEVersion()1760     private void maybeUpdateMasterJEVersion() {
1761 
1762         /* Check if storing JE version information is supported */
1763         if (group.getFormatVersion() < RepGroupImpl.FORMAT_VERSION_3) {
1764             return;
1765         }
1766 
1767         final JEVersion currentJEVersion = repImpl.getCurrentJEVersion();
1768         final RepNodeImpl node = group.getMember(nameIdPair.getName());
1769 
1770         if (currentJEVersion.equals(node.getJEVersion())) {
1771             return;
1772         }
1773         node.updateJEVersion(currentJEVersion);
1774         repGroupDB.updateMember(node, false);
1775     }
1776 
notifyReplicaConnected()1777     void notifyReplicaConnected() {
1778         nodeState.changeAndNotify(REPLICA, masterStatus.getNodeMasterNameId());
1779     }
1780 
1781     /**
1782      * Returns true if the node has been shutdown or if the underlying
1783      * environment has been invalidated. It's used as the basis for exiting
1784      * the FeederManager or the Replica.
1785      */
isShutdownOrInvalid()1786     boolean isShutdownOrInvalid() {
1787         if (isShutdown()) {
1788             return true;
1789         }
1790         if (getRepImpl().wasInvalidated()) {
1791             saveShutdownException(getRepImpl().getInvalidatingException());
1792             return true;
1793         }
1794         return false;
1795     }
1796 
1797     /**
1798      * Used to shutdown all activity associated with this replication stream.
1799      * If method is invoked from different thread of control, it will wait
1800      * until the rep node thread exits. If it's from the same thread, it's the
1801      * caller's responsibility to exit the thread upon return from this method.
1802      *
1803      * @throws InterruptedException
1804      * @throws DatabaseException
1805      */
shutdown()1806     public void shutdown()
1807         throws InterruptedException, DatabaseException {
1808 
1809         if (shutdownDone()) {
1810             return;
1811         }
1812 
1813         LoggerUtils.info(logger, repImpl, "Shutting down node " + nameIdPair);
1814 
1815         /* Fire a LeaveGroup if this RepNode is valid. */
1816         if (repImpl.isValid()) {
1817             monitorEventManager.notifyLeaveGroup(getLeaveReason());
1818         }
1819 
1820         /* Stop accepting any new network requests. */
1821         serviceDispatcher.preShutdown();
1822 
1823         if (elections != null) {
1824             elections.shutdown();
1825         }
1826 
1827         /* Initiate the FeederManger soft shutdown if it's active. */
1828         feederManager.shutdownQueue();
1829 
1830         if ((getReplicaCloseCatchupMs() >= 0) &&
1831             (nodeState.getRepEnvState().isMaster())) {
1832 
1833             /*
1834              * A group shutdown. Shutting down the queue will cause the
1835              * FeederManager to shutdown its feeders and exit.
1836              */
1837             this.join();
1838         }
1839 
1840         /* Shutdown the replica, if it's active. */
1841         replica.shutdown();
1842 
1843         shutdownThread(logger);
1844 
1845         LoggerUtils.info(logger, repImpl,
1846                          "RepNode main thread: " + this.getName() + " exited.");
1847         /* Shut down all other services. */
1848         utilityServicesShutdown();
1849 
1850         /* Shutdown all the services before shutting down the dispatcher. */
1851         MasterTransfer mt = getActiveTransfer();
1852         if (mt != null) {
1853             Exception ex = getSavedShutdownException();
1854             if (ex == null) {
1855                 ex = new MasterTransferFailureException("shutting down");
1856             }
1857             mt.abort(ex);
1858         }
1859         serviceDispatcher.shutdown();
1860         LoggerUtils.info(logger, repImpl,
1861                          nameIdPair + " shutdown completed.");
1862         masterStatus.setGroupMaster(null, NameIdPair.NULL);
1863         readyLatch.releaseAwait(getSavedShutdownException());
1864 
1865         /* Cancel the TimerTasks. */
1866         channelTimeoutTask.cancel();
1867         if (logFlusher != null) {
1868             logFlusher.cancelTask();
1869         }
1870         timer.cancel();
1871     }
1872 
1873 
1874     /**
1875      * Soft shutdown for the RepNode thread. Note that since the thread is
1876      * shared by the FeederManager and the Replica, the FeederManager or
1877      * Replica specific soft shutdown actions should already have been done
1878      * earlier.
1879      */
1880     @Override
initiateSoftShutdown()1881     protected int initiateSoftShutdown() {
1882         return getThreadWaitInterval();
1883     }
1884 
1885     /* Get the shut down reason for this node. */
getLeaveReason()1886     private LeaveReason getLeaveReason() {
1887         LeaveReason reason = null;
1888 
1889         Exception exception = getSavedShutdownException();
1890         if (exception == null) {
1891             reason = LeaveReason.NORMAL_SHUTDOWN;
1892         } else if (exception instanceof GroupShutdownException) {
1893             reason = LeaveReason.MASTER_SHUTDOWN_GROUP;
1894         } else {
1895             reason = LeaveReason.ABNORMAL_TERMINATION;
1896         }
1897 
1898         return reason;
1899     }
1900 
utilityServicesShutdown()1901     private void utilityServicesShutdown() {
1902         if (ldiff != null) {
1903             ldiff.shutdown();
1904         }
1905 
1906         if (logFeederManager != null) {
1907             logFeederManager.shutdown();
1908         }
1909 
1910         if (binaryNodeStateService != null) {
1911             binaryNodeStateService.shutdown();
1912         }
1913 
1914         if (nodeStateService != null) {
1915             serviceDispatcher.cancel(NodeStateService.SERVICE_NAME);
1916         }
1917 
1918         if (groupService != null) {
1919             serviceDispatcher.cancel(GroupService.SERVICE_NAME);
1920         }
1921     }
1922 
1923     /**
1924      * Must be invoked on the Master via the last open handle.
1925      *
1926      * Note that the method itself does not shutdown the group. It merely
1927      * sets replicaCloseCatchupMs, indicating that the ensuing handle close
1928      * should shutdown the Replicas. The actual coordination with the closing
1929      * of the handle is implemented by ReplicatedEnvironment.shutdownGroup().
1930      *
1931      * @see ReplicatedEnvironment#shutdownGroup(long, TimeUnit)
1932      */
shutdownGroupOnClose(long timeoutMs)1933     public void shutdownGroupOnClose(long timeoutMs)
1934         throws IllegalStateException {
1935 
1936         if (!nodeState.getRepEnvState().isMaster()) {
1937             throw new IllegalStateException
1938                 ("Node state must be " + MASTER +
1939                  ", not " + nodeState.getRepEnvState());
1940         }
1941         replicaCloseCatchupMs = (timeoutMs < 0) ? 0 : timeoutMs;
1942     }
1943 
1944     /**
1945      * JoinGroup ensures that a RepNode is actively participating in a
1946      * replication group. It's invoked each time a replicated environment
1947      * handle is created.
1948      *
1949      * If the node is already participating in a replication group, because
1950      * it's not the first handle to the environment, it will return without
1951      * having to wait. Otherwise it will wait until a master is elected and
1952      * this node is active, either as a Master, or as a Replica.
1953      *
1954      * If the node joins as a replica, it will wait further until it has become
1955      * sufficiently consistent as defined by its consistency argument. By
1956      * default it uses PointConsistencyPolicy to ensure that it is at least as
1957      * consistent as the master as of the time the handle was opened.
1958      *
1959      * A node can also join in the Unknown state if it has been configured to
1960      * do so via ENV_UNKNOWN_STATE_TIMEOUT.
1961      *
1962      * @throws UnknownMasterException If a master cannot be established within
1963      * ENV_SETUP_TIMEOUT, unless ENV_UNKNOWN_STATE_TIMEOUT has
1964      * been set to allow the creation of a handle while in the UNKNOWN state.
1965      *
1966      * @return MASTER, REPLICA, or UNKNOWN (if ENV_UNKNOWN_STATE_TIMEOUT
1967      * is set)
1968      */
1969     public ReplicatedEnvironment.State
joinGroup(ReplicaConsistencyPolicy consistency, QuorumPolicy initialElectionPolicy)1970         joinGroup(ReplicaConsistencyPolicy consistency,
1971                   QuorumPolicy initialElectionPolicy)
1972         throws ReplicaConsistencyException, DatabaseException {
1973 
1974         final JoinGroupTimeouts timeouts =
1975                 new JoinGroupTimeouts(getConfigManager());
1976 
1977         startup(initialElectionPolicy);
1978         LoggerUtils.finest(logger, repImpl, "joinGroup " +
1979                            nodeState.getRepEnvState());
1980 
1981         DatabaseException exitException = null;
1982         int retries=0;
1983         repImpl.getStartupTracker().start(Phase.BECOME_CONSISTENT);
1984         repImpl.getStartupTracker().setProgress
1985            (RecoveryProgress.BECOME_CONSISTENT);
1986         try {
1987             for (retries=0; retries < JOIN_RETRIES; retries++ ) {
1988                 try {
1989                     /* Wait for Feeder/Replica to be fully initialized. */
1990                     boolean done = getReadyLatch().awaitOrException
1991                         (timeouts.getTimeout(), TimeUnit.MILLISECONDS);
1992 
1993                     /*
1994                      * Save the state, and use it from this point forward,
1995                      * since the node's state may change again.
1996                      */
1997                     final ReplicatedEnvironment.State finalState =
1998                         nodeState.getRepEnvState();
1999                     if (!done) {
2000                         /* An election or setup, timeout. */
2001                         if (finalState.isReplica()) {
2002                             if (timeouts.timeoutIsForUnknownState()) {
2003                                 /*
2004                                  * Replica syncing up; move onwards to the
2005                                  * setup timeout and continue with the syncup.
2006                                  */
2007                                 timeouts.setSetupTimeout();
2008                                 continue;
2009                             }
2010                             throw new ReplicaConsistencyException
2011                                 (String.format("Setup time exceeded %,d ms",
2012                                                timeouts.getSetupTimeout()),
2013                                                null);
2014                         }
2015 
2016                         if (finalState.isUnknown() &&
2017                             timeouts.timeoutIsForUnknownState()) {
2018                             return UNKNOWN;
2019                         }
2020                         break;
2021                     }
2022 
2023                     switch (finalState) {
2024                         case UNKNOWN:
2025 
2026                             /*
2027                              * State flipped between release of ready latch and
2028                              * nodeState.getRepEnvState() above; retry for a
2029                              * Master/Replica state.
2030                              */
2031                             continue;
2032 
2033                         case REPLICA:
2034                             joinAsReplica(consistency);
2035                             break;
2036 
2037                         case MASTER:
2038                             LoggerUtils.info(logger, repImpl,
2039                                              "Joining group as master");
2040                             break;
2041 
2042                         case DETACHED:
2043                             throw EnvironmentFailureException.
2044                                 unexpectedState("Node in DETACHED state " +
2045                                                 "while joining group.");
2046                     }
2047 
2048                     return finalState;
2049                 } catch (InterruptedException e) {
2050                     throw EnvironmentFailureException.unexpectedException(e);
2051                 } catch (MasterStateException e) {
2052                     /* Transition to master while establishing consistency. */
2053                     LoggerUtils.warning(logger, repImpl,
2054                                         "Join retry due to master transition: "
2055                                         + e.getMessage());
2056                     continue;
2057                 } catch (RestartRequiredException e) {
2058                     LoggerUtils.warning(logger, repImpl,
2059                                         "Environment needs to be restarted: " +
2060                                         e.getMessage());
2061                     throw e;
2062                 } catch (DatabaseException e) {
2063                     Throwable cause = e.getCause();
2064                     if ((cause != null) &&
2065                         (cause.getClass() ==
2066                          Replica.ConnectRetryException.class)) {
2067 
2068                         /*
2069                          * The master may have changed. Retry if there is time
2070                          * left to do so. It may result in a new master.
2071                          */
2072                         exitException = e;
2073                         if (timeouts.getTimeout() > 0) {
2074                             LoggerUtils.warning(logger, repImpl,
2075                                                 "Join retry due to exception: "
2076                                                 + cause.getMessage());
2077                             continue;
2078                         }
2079                     }
2080                     throw e;
2081                 }
2082             }
2083         } finally {
2084             repImpl.getStartupTracker().stop(Phase.BECOME_CONSISTENT);
2085         }
2086 
2087         /* Timed out or exceeded retries. */
2088         if (exitException != null) {
2089             LoggerUtils.warning(logger, repImpl, "Exiting joinGroup after " +
2090                                 retries + " retries." + exitException);
2091             throw exitException;
2092         }
2093         throw new UnknownMasterException(null, repImpl.getStateChangeEvent());
2094     }
2095 
2096     /**
2097      * Join the group as a Replica ensuring that the node is sufficiently
2098      * consistent as defined by its consistency policy.
2099      *
2100      * @param consistency the consistency policy to use when joining initially
2101      */
joinAsReplica(ReplicaConsistencyPolicy consistency)2102     private void joinAsReplica(ReplicaConsistencyPolicy consistency)
2103         throws InterruptedException {
2104 
2105         if (consistency == null) {
2106             final int consistencyTimeout =
2107                 getConfigManager().getDuration(ENV_CONSISTENCY_TIMEOUT);
2108             consistency = new PointConsistencyPolicy
2109                 (new VLSN(replica.getMasterTxnEndVLSN()),
2110                  consistencyTimeout, TimeUnit.MILLISECONDS);
2111         }
2112 
2113         /*
2114          * Wait for the replica to become sufficiently consistent.
2115          */
2116         consistency.ensureConsistency(repImpl);
2117 
2118         /*
2119          * Flush changes to the file system. The flush ensures in particular
2120          * that any member database updates defining this node itself are not
2121          * lost in case of a process crash. See SR 20607.
2122          */
2123         repImpl.getLogManager().flushNoSync();
2124 
2125         LoggerUtils.info(logger, repImpl, "Joined group as a replica. " +
2126                          " join consistencyPolicy=" + consistency +
2127                          " " + repImpl.getVLSNIndex().getRange());
2128     }
2129 
2130     /**
2131      * Should be called whenever a new VLSN is associated with a log entry
2132      * suitable for Replica/Feeder syncup.
2133      */
trackSyncableVLSN(VLSN syncableVLSN, long lsn)2134     public void trackSyncableVLSN(VLSN syncableVLSN, long lsn) {
2135         cbvlsnTracker.track(syncableVLSN, lsn);
2136     }
2137 
2138     /**
2139      * Returns the group wide CBVLSN. The group CBVLSN is computed as the
2140      * minimum of CBVLSNs after discarding CBVLSNs that are obsolete. A CBVLSN
2141      * is considered obsolete, if it has not been updated within a configurable
2142      * time interval relative to the time that the most recent CBVLSN was
2143      * updated.  Also considers VLSNs protected by REPLAY_COST_PERCENT.
2144      *
2145      * <p>May return NULL_VLSN.
2146      */
getGroupCBVLSN()2147     public VLSN getGroupCBVLSN() {
2148         return VLSN.min(globalCBVLSN.getCBVLSN(), replayCostMinVLSN);
2149     }
2150 
2151     /**
2152      * Marks the start of the search for a matchpoint that happens during a
2153      * syncup.  The globalCBVLSN can't be changed when a syncup is in
2154      * progress. A feeder may have multiple syncups in action. The caller
2155      * should call {@link #syncupEnded} when the syncup is done.
2156      */
syncupStarted()2157     public void syncupStarted() {
2158         globalCBVLSN.syncupStarted();
2159     }
2160 
syncupEnded()2161     public void syncupEnded() {
2162         globalCBVLSN.syncupEnded();
2163     }
2164 
2165     /**
2166      * Returns the file number that forms a barrier for the cleaner's file
2167      * deletion activities. Files with numbers >= this file number cannot be
2168      * removed by the cleaner without disrupting the replication stream.
2169      *
2170      * @return the file number that's the barrier for cleaner file deletion
2171      *
2172      * @throws DatabaseException
2173      */
getCleanerBarrierFile()2174     public long getCleanerBarrierFile()
2175         throws DatabaseException {
2176 
2177         /*
2178          * It turns out that this method is only called in contexts that
2179          * specify an active syncup is underway.  Neither the global CBVLSN or
2180          * the VLSNIndex can change during an active syncup, so being called
2181          * during an active syncup means we can depend on the global CBVLSN
2182          * having an entry in the VLSNIndex.  The check below that there are
2183          * active syncups underway doesn't guarantee that the syncup will
2184          * remain in effect during the whole call, but is a good indication
2185          * that we can depend on the global CBVLSN having an entry in the
2186          * VLSNIndex, as needed later in this method.
2187          */
2188         if (globalCBVLSN.getActiveSyncups() <= 0) {
2189             throw new IllegalStateException(
2190             "getCleanerBarrierFile should only be called during" +
2191             " an active syncup");
2192         }
2193 
2194         /*
2195          * Take the minimum of GlobalCBVLSN, replayCostMinVLSN, and
2196          * SyncCleanerBarrier
2197          */
2198         final VLSN globalCBVLSNValue = globalCBVLSN.getCBVLSN();
2199         final VLSN cbvlsn = VLSN.min(globalCBVLSNValue, replayCostMinVLSN);
2200 
2201         if (logger.isLoggable(Level.FINE)) {
2202             LoggerUtils.fine(
2203                 logger, repImpl,
2204                 "Computing getCleanerBarrierFile:" +
2205                 " GlobalCBVLSN=" + globalCBVLSNValue +
2206                 " replayCostMinVLSN=" + replayCostMinVLSN +
2207                 " CBVLSN=" + cbvlsn);
2208         }
2209 
2210         /* The GlobalCBVLSN can be null, putting it outside the VLSN index */
2211         if (cbvlsn.isNull()) {
2212             return 0;
2213         }
2214         return repImpl.getVLSNIndex().getLTEFileNumber(cbvlsn);
2215     }
2216 
2217     /**
2218      * Protects additional files from deletion by returning a subset of the
2219      * specified unprotected files.
2220      *
2221      * <p>The unprotectedFiles parameter provides a collection of files that
2222      * the caller believes are safe to delete.  These files have had their live
2223      * data migrated by the cleaner, and they are not in use by DbBackup or
2224      * DataSync.  This method decides which of the unprotected files should be
2225      * removed from the return value to protect them from deletion so that they
2226      * can be used to stream replication data to replay at replicas.
2227      *
2228      * <p>The diagram below represents log files organized by the time they
2229      * were created, with the oldest file at the left and the newest at the
2230      * right.
2231      *
2232      * <pre>
2233      *
2234      * <-- Older                                             Newer -->
2235      *
2236      * All files:
2237      *
2238      *   Region 1  |        Region 2         |  Region 3  |  Region 4
2239      * ------------A------------X------------B------------C------------
2240      *             |            |            |            |
2241      *             Start of     CBVLSN       Global       Last
2242      *             VLSN Index   |            CBVLSN       VLSN
2243      *                          |                         |
2244      * From unprotected files:  |                         |
2245      *                          |                         |
2246      *     Truncated files      |     Retained files      | Barren files
2247      *
2248      * </pre>
2249      *
2250      * <p>Note that the unprotected files represent a subset of the files in
2251      * each region of the diagram.
2252      *
2253      * <p>Files in region 1 are files that are not represented in the VLSN
2254      * Index.  The mappings between LSNs and VLSNs maintained by the VLSN Index
2255      * are needed to support replays, so these files cannot be used for
2256      * replication.
2257      *
2258      * <p>Region 1 files consist primarily of live files containing data needed
2259      * for recovery, typically with a high enough percentage of live data that
2260      * the cleaner does not consider them worth cleaning.  There are usually
2261      * gaps in the sequence of files in this region because some of these older
2262      * files have been removed after being cleaned.
2263      *
2264      * <p>Note that entries are removed from the VLSN Index before the
2265      * associated log file files are deleted.  If log files are found to be in
2266      * use by DbBackup, or the system crashes before the files can be deleted,
2267      * it is possible for cleaned log files to appear in region 1 that do not
2268      * have associated mappings in the VLSN index.  That is the reason why it
2269      * is possible for this region to contain some unprotected files.
2270      *
2271      * <p>Files in region 2 are files that are represented in the VLSN index
2272      * and so can be used for replication.  Because these files only contain
2273      * VLSNs before the Global CBVLSN, they are not known to be needed to
2274      * support replication for electable replicas that have been in contact
2275      * with the master within the time period represented by the
2276      * REP_STREAM_TIMEOUT parameter, or by any replica currently performing
2277      * replication, but they might still be useful to secondary nodes that are
2278      * out of contact or electable nodes that have been out of contact for
2279      * longer than REP_STREAM_TIMEOUT.
2280      *
2281      * <p>The system uses the values of the REPLAY_COST_PERCENT and
2282      * REPLAY_FREE_DISK_PERCENT parameters to determine if it is worthwhile to
2283      * retain cleaned files in region 2, while also satisfying the requested
2284      * free disk space target.  The system needs to retain a contiguous subset
2285      * of the files at the upper end of this range in order for them to be
2286      * useful for replication, since replication requires an uninterrupted
2287      * sequence of VLSNs.  The system will select a VLSN between points A and B
2288      * as point X, which represents the lowest VLSN for files that should be
2289      * protected.
2290      *
2291      * <p>Files in region 3 are files that are known to be needed for
2292      * replication by electable nodes, so otherwise unprotected files in this
2293      * range are retained for that reason.
2294      *
2295      * <p>Files in region 4 are files that do not contain any VLSNs.  These
2296      * files include log entries to represent changes to the structure of the
2297      * Btree, but that are not needed for replication.  Cleaned files in this
2298      * region are candidates for deletion because they are not needed for
2299      * either replication or network restore.
2300      *
2301      * <p>The unprotected files passed to the getUnprotectedFileSet consist of
2302      * cleaned files that appear in the four regions described above.
2303      *
2304      * <p>The unprotected files in region 1 will always be removed, and are
2305      * called "truncated files".
2306      *
2307      * <p>Unprotected files in region 2 below point X, the final CBVLSN value,
2308      * will be removed, and are also called "truncated files".
2309      *
2310      * <p>Unprotected files in region 3 will be retained because they are known
2311      * to be needed for replication.
2312      *
2313      * <p>Unprotected files in region 4 will always be removed because they are
2314      * not needed for replication or replay.  These files are called "barren
2315      * files".
2316      *
2317      * <p>Note that the Global CBVLSN increases over time, which moves files
2318      * from region 3 into region 2, where they can be evaluated for retention
2319      * based on replay cost.  The minimum VLSN of the files protected by replay
2320      * cost also increases over time, allowing the low end of the VLSN Index to
2321      * be truncated.
2322      *
2323      * @param unprotectedFiles set of file numbers of the files that are
2324      * potential candidates for deletion, before taking into consideration the
2325      * sync-up needs of other nodes in the replication group
2326      *
2327      * @param cleaner the cleaner, to provide the implementation needs
2328      * additional information about log files
2329      *
2330      * @return a subset of the original set, where files that we want to
2331      * protect have been removed.  We never destructively modify the input set.
2332      * Instead we return either a restricted view of the original set or a new
2333      * set with copies of some of the file numbers from the original set.
2334      */
getUnprotectedFileSet( final NavigableSet<Long> unprotectedFiles, final Cleaner cleaner)2335     public NavigableSet<Long> getUnprotectedFileSet(
2336         final NavigableSet<Long> unprotectedFiles,
2337         final Cleaner cleaner) {
2338 
2339         final VLSN globalCBVLSNValue = globalCBVLSN.getCBVLSN();
2340         final long globalCBVLSNFile = globalCBVLSNValue.isNull() ?
2341             0 :
2342             getVLSNIndex().getLTEFileNumber(globalCBVLSNValue);
2343         final VLSN lastVLSN = getVLSNIndex().getRange().getLast();
2344         final long lastVLSNFile = getVLSNIndex().getGTEFileNumber(lastVLSN);
2345 
2346         /*
2347          * Compute the minimum VLSN associated with replay cost based on
2348          * REPLAY_COST_PERCENT and REPLAY_FREE_DISK_PERCENT
2349          */
2350         replayCostMinVLSN = computeReplayCostMinVLSN(
2351             unprotectedFiles, cleaner, globalCBVLSNFile, lastVLSNFile);
2352 
2353         /*
2354          * Compute the new CBVLSN as the minimum of globalCBVLSN and
2355          * replayCostMinVLSN
2356          */
2357         final VLSN cbvlsn;
2358         final long cbvlsnFile;
2359         if (!replayCostMinVLSN.isNull() &&
2360             (replayCostMinVLSN.compareTo(globalCBVLSNValue) < 0)) {
2361 
2362             /* Log if the replayCostMinVLSN wins out */
2363             if (logger.isLoggable(Level.INFO)) {
2364                 LoggerUtils.info(
2365                     logger, envImpl,
2366                     "Unprotected file set determined by replay cost:" +
2367                     " GlobalCBVLSN=" + globalCBVLSNValue +
2368                     " replayCostMinVLSN=" + replayCostMinVLSN);
2369             }
2370             cbvlsn = replayCostMinVLSN;
2371             cbvlsnFile = getVLSNIndex().getLTEFileNumber(replayCostMinVLSN);
2372         } else {
2373             cbvlsn = globalCBVLSNValue;
2374             cbvlsnFile = globalCBVLSNFile;
2375             if (logger.isLoggable(Level.FINE)) {
2376                 LoggerUtils.fine(logger, envImpl,
2377                                  "Computing unprotected file set:" +
2378                                  " GlobalCBVLSN=" + globalCBVLSNValue +
2379                                  " replayCostMinVLSN=" + replayCostMinVLSN);
2380             }
2381         }
2382 
2383         /*
2384          * The files we want to preserve range from the one containing the
2385          * CBVLSN, to the one containing the last VLSN.  So there could be up
2386          * to three disjoint subsets of the unprotected files: (1) those before
2387          * the CBVLSN (the "truncated files"), (2) those we want to preserve
2388          * (the "retained files"), and (3) those after the last VLSN (the
2389          * "barren files").  Since we need the retained files in (2), the
2390          * unprotected files we return from this method consist of the files in
2391          * (1) and (3).
2392          */
2393         final NavigableSet<Long> truncatedFiles =
2394             unprotectedFiles.headSet(cbvlsnFile, false);
2395         final NavigableSet<Long> barrenFiles =
2396             unprotectedFiles.tailSet(lastVLSNFile, false);
2397 
2398         if (!barrenFiles.isEmpty()) {
2399             if (logger.isLoggable(Level.INFO)) {
2400                 LoggerUtils.info
2401                    (logger, envImpl,
2402                     "CBVLSN file is 0x" +
2403                      Long.toHexString(cbvlsnFile) +
2404                      " but these files have no VLSNs and can be deleted: " +
2405                      enumerateFiles(barrenFiles));
2406 
2407             }
2408         }
2409 
2410         /*
2411          * enumerateFiles() can be expensive, so only generate this String
2412          * if logging is set to FINER
2413          */
2414         if (logger.isLoggable(Level.FINER)) {
2415             if (!truncatedFiles.isEmpty()) {
2416                 logger.finer("Known unused files before CBVLSN start: " +
2417                              enumerateFiles(truncatedFiles));
2418             }
2419             logger.finer("Candidates for deletion: " +
2420                          enumerateFiles(unprotectedFiles));
2421         }
2422 
2423         NavigableSet<Long> result;
2424         if (barrenFiles.isEmpty()) {
2425             result = truncatedFiles;
2426         } else if (truncatedFiles.isEmpty()) {
2427             result = barrenFiles;
2428         } else {
2429 
2430             /*
2431              * The result needs to be made up from two disjoint subsets of the
2432              * original set, so we can't simply return a view of that set.
2433              */
2434             result = new TreeSet<Long>(truncatedFiles);
2435             result.addAll(barrenFiles);
2436         }
2437 
2438         if (result.isEmpty()) {
2439             LoggerUtils.traceAndLog(
2440                 logger, repImpl, Level.WARNING,
2441                 "Replication prevents deletion of " + unprotectedFiles.size() +
2442                 " files by Cleaner. " +
2443                 "Start file=0x" + Long.toHexString(cbvlsnFile) +
2444                 " holds CBVLSN " + cbvlsn +
2445                 ", end file=0x" + Long.toHexString(lastVLSNFile) +
2446                 " holds last VLSN " + lastVLSN);
2447         }
2448         return result;
2449     }
2450 
2451     /**
2452      * Returns the minimum VLSN of files that should be retained to support
2453      * replay, or NULL_VLSN if disabled or no files are found to protect.
2454      */
computeReplayCostMinVLSN( final NavigableSet<Long> unprotectedFiles, final Cleaner cleaner, final long globalCBVLSNFile, final long lastVLSNFile)2455     private VLSN computeReplayCostMinVLSN(
2456         final NavigableSet<Long> unprotectedFiles,
2457         final Cleaner cleaner,
2458         final long globalCBVLSNFile,
2459         final long lastVLSNFile) {
2460 
2461         /* Check if disabled */
2462         if (replayCostPercent == 0) {
2463             if (logger.isLoggable(Level.FINE)) {
2464                 LoggerUtils.fine(logger, repImpl,
2465                                  "replayCostPercent is disabled");
2466             }
2467             return VLSN.NULL_VLSN;
2468         }
2469 
2470         /*
2471          * Use FileSummary information to determine log file sizes.  Specify
2472          * false for the includeTrackedFiles parameter to say that no
2473          * information is needed about changes made since the last checkpoint
2474          * because this method is called immediately after a checkpoint.
2475          */
2476         final Map<Long, FileSummary> fileSummaryMap =
2477             cleaner.getUtilizationProfile().getFileSummaryMap(false);
2478 
2479         /* Prepare to check free disk space, if enabled */
2480         SpaceInfo spaceInfo = (replayFreeDiskPercent != 0) ?
2481             new SpaceInfo(fileSummaryMap, unprotectedFiles, globalCBVLSNFile,
2482                           lastVLSNFile) :
2483             null;
2484 
2485         /*
2486          * Compute the number of bytes needed for a network restore, including
2487          * files that are already protected, plus any files containing VLSNs
2488          * between the GlobalCBVLSN and the last VLSN, because they are slated
2489          * to be protected.
2490          */
2491         long networkRestoreBytes = 0;
2492         for (final Entry<Long, FileSummary> ent : fileSummaryMap.entrySet()) {
2493             final long file = ent.getKey();
2494             if (!unprotectedFiles.contains(file) ||
2495                 ((file >= globalCBVLSNFile) && (file <= lastVLSNFile))) {
2496                 final FileSummary fileSummary = ent.getValue();
2497                 networkRestoreBytes += fileSummary.totalSize;
2498             }
2499         }
2500 
2501         /*
2502          * Compute the maximum number of replay bytes as a percentage of the
2503          * network restore bytes
2504          */
2505         final long maxReplayBytes =
2506             (long) (networkRestoreBytes / (replayCostPercent / 100.0));
2507 
2508         /*
2509          * Iterate backwards over the files, from newest to oldest, counting
2510          * the size in bytes until it reaches the maximum number of replay
2511          * bytes worth retaining, and so long as we have enough free space.
2512          * Don't count barren files, since they are not used for replay and
2513          * will be deleted, or files below the VLSNIndex, since those can't be
2514          * used for replay.
2515          */
2516         final VLSN firstVLSN = getVLSNIndex().getRange().getFirst();
2517         final long firstVLSNFile = getVLSNIndex().getLTEFileNumber(firstVLSN);
2518         final FileSelector fileSelector = cleaner.getFileSelector();
2519         long replayBytes = 0;
2520         VLSN newReplayCostMinVLSN = VLSN.NULL_VLSN;
2521         for (final long file : unprotectedFiles.descendingSet()) {
2522 
2523             /* Ignore barren files */
2524             if (file > lastVLSNFile) {
2525                 continue;
2526             }
2527 
2528             /* Done if we pass the start of the VLSN Index */
2529             if (file < firstVLSNFile) {
2530                 break;
2531             }
2532 
2533             /* Ignore deleted files */
2534             final VLSN fileFirstVLSN = fileSelector.getFirstVLSN(file);
2535             if (fileFirstVLSN == null) {
2536                 continue;
2537             }
2538 
2539             /* Check free disk space */
2540             final long fileSize = fileSummaryMap.get(file).totalSize;
2541             if (spaceInfo != null) {
2542                 final FileStoreSpaceInfo fileInfo =
2543                     spaceInfo.getFileInfo(file);
2544                 if (fileInfo != null) {
2545                     if (fileSize > fileInfo.replaySpace) {
2546                         if (logger.isLoggable(Level.INFO)) {
2547                             LoggerUtils.info(
2548                                 logger, envImpl,
2549                                 String.format(
2550                                     "Limited free disk space prevented" +
2551                                     " retaining some log files." +
2552                                     " Retained %,d bytes, but wanted to" +
2553                                     " retain %,d bytes based on replay cost." +
2554                                     " Associated file store: %s",
2555                                     replayBytes, maxReplayBytes, fileInfo));
2556                         }
2557                         break;
2558                     }
2559                     fileInfo.replaySpace -= fileSize;
2560                 }
2561             }
2562             newReplayCostMinVLSN = fileFirstVLSN;
2563             replayBytes += fileSize;
2564 
2565             /* Check if we've reached the maximum of useful replay bytes */
2566             if (replayBytes >= maxReplayBytes) {
2567                 break;
2568             }
2569         }
2570 
2571         if (logger.isLoggable(Level.FINE)) {
2572             LoggerUtils.fine(
2573                 logger, repImpl,
2574                 String.format("Computing replayCostMinVLSN:" +
2575                               " networkRestoreBytes=%,d" +
2576                               " maxReplayBytes=%,d" +
2577                               " replayBytes=%,d" +
2578                               " firstVLSN=%s" +
2579                               " replayCostMinVLSN=%s",
2580                               networkRestoreBytes, maxReplayBytes, replayBytes,
2581                               firstVLSN, newReplayCostMinVLSN));
2582         }
2583         return newReplayCostMinVLSN;
2584     }
2585 
2586     /** Holds space information for a file store. */
2587     private class FileStoreSpaceInfo {
2588         private final FileStoreInfo fileStoreInfo;
2589         final long totalSpace;
2590         final long freeSpace;
2591         long replaySpace;
2592 
FileStoreSpaceInfo(final FileStoreInfo fileStoreInfo)2593         FileStoreSpaceInfo(final FileStoreInfo fileStoreInfo)
2594             throws IOException {
2595 
2596             this.fileStoreInfo = fileStoreInfo;
2597             totalSpace = fileStoreInfo.getTotalSpace();
2598             freeSpace = fileStoreInfo.getUsableSpace();
2599             replaySpace = freeSpace - getTargetFreeSpace();
2600         }
2601 
2602         /**
2603          * Returns the target free disk space for the associate file store.
2604          */
getTargetFreeSpace()2605         private long getTargetFreeSpace() {
2606             return (long) (totalSpace * (replayFreeDiskPercent / 100.0));
2607         }
2608 
2609         @Override
toString()2610         public String toString() {
2611             return String.format("%s: totalSpace=%,d freeSpace=%,d" +
2612                                  " targetFreeSpace=%,d",
2613                                  fileStoreInfo, totalSpace, freeSpace,
2614                                  getTargetFreeSpace());
2615         }
2616     }
2617 
2618     /**
2619      * Maintain information about disk space available for log files used for
2620      * replay, so we can decide whether we have enough free space to retain
2621      * them.  Information is looked up by file, but the return value expresses
2622      * the free space available on the file store (meaning volume or disk)
2623      * associated with the file.
2624      */
2625     private class SpaceInfo {
2626 
2627         /**
2628          * Maps a file store to an object storing space information for log
2629          * files in that file store.
2630          */
2631         private final Map<FileStoreInfo, FileStoreSpaceInfo> fileStoreInfoMap =
2632             new HashMap<FileStoreInfo, FileStoreSpaceInfo>();
2633 
2634         /**
2635          * Maps a file number to the object that stores space information for
2636          * all files in the same file store.  The value is shared for all
2637          * entries in this map associated with the same file store, and is
2638          * shared with the associated value in fileStoreInfoMap.
2639          */
2640         private final Map<Long, FileStoreSpaceInfo> fileMap =
2641             new HashMap<Long, FileStoreSpaceInfo>();
2642 
2643         /** The number of IOExceptions logged. */
2644         private int loggedIOExceptions;
2645 
SpaceInfo(final Map<Long, FileSummary> fileSummaryMap, final NavigableSet<Long> unprotectedFiles, final long globalCBVLSNFile, final long lastVLSNFile)2646         SpaceInfo(final Map<Long, FileSummary> fileSummaryMap,
2647                   final NavigableSet<Long> unprotectedFiles,
2648                   final long globalCBVLSNFile,
2649                   final long lastVLSNFile) {
2650 
2651             /*
2652              * Tally information for all currently unprotected files, including
2653              * barren files, but not files containing VLSNs between the global
2654              * CBVLSN and the last VLSN, because they will be protected
2655              */
2656             for (final long file : unprotectedFiles.descendingSet()) {
2657                 if ((file > globalCBVLSNFile) && (file <= lastVLSNFile)) {
2658                     continue;
2659                 }
2660                 final FileSummary fileSummary = fileSummaryMap.get(file);
2661                 if (fileSummary != null) {
2662                     tallyFile(file, fileSummary);
2663                 }
2664             }
2665         }
2666 
2667         /**
2668          * Returns the disk space information for the file store associated
2669          * with the specified file.
2670          */
getFileInfo(final long file)2671         FileStoreSpaceInfo getFileInfo(final long file) {
2672             return fileMap.get(file);
2673         }
2674 
2675         /**
2676          * Create an entry for the file store associated with the file, if not
2677          * already present, and tally the space used by the individual file.
2678          */
tallyFile(final long file, final FileSummary fileSummary)2679         private void tallyFile(final long file,
2680                                final FileSummary fileSummary) {
2681 
2682             final String fileName = repImpl.getFileManager().getFullFileName(
2683                 file, FileManager.JE_SUFFIX);
2684             try {
2685                 final FileStoreInfo fileStoreInfo =
2686                     FileStoreInfo.getInfo(fileName);
2687                 FileStoreSpaceInfo fileInfo =
2688                     fileStoreInfoMap.get(fileStoreInfo);
2689                 if (fileInfo == null) {
2690 
2691                     /*
2692                      * Set the initial value to the free space available beyond
2693                      * the target amount.  We will then add to this value the
2694                      * amount of space used by replay log files located in this
2695                      * file store.  If the initial value is negative, then the
2696                      * result will be the amount of space we have available to
2697                      * store log files and still maintain the requested free
2698                      * space.  The result may be negative, meaning we can't
2699                      * meet the requirement even if all these replay files are
2700                      * deleted.
2701                      */
2702                     fileInfo = new FileStoreSpaceInfo(fileStoreInfo);
2703                     fileStoreInfoMap.put(fileStoreInfo, fileInfo);
2704                     if (logger.isLoggable(Level.FINE)) {
2705                         LoggerUtils.fine(
2706                             logger, repImpl,
2707                             "Space information for file store " + fileInfo);
2708                     }
2709                 }
2710                 fileMap.put(file, fileInfo);
2711                 fileInfo.replaySpace += fileSummary.totalSize;
2712             } catch (IOException e) {
2713 
2714                 /*
2715                  * Problem getting file store information.  Leave this file out
2716                  * of the summary, which means that it will be retained because
2717                  * we can't figure out which file store it is part of.
2718                  *
2719                  * Only log the first exception at INFO, to reduce clutter.
2720                  */
2721                 final Level level =
2722                     (loggedIOExceptions == 0) ? Level.INFO : Level.FINE;
2723                 if (logger.isLoggable(level)) {
2724                     loggedIOExceptions++;
2725                     LoggerUtils.logMsg(
2726                         logger, repImpl, level,
2727                         "Problem accessing file store info for file " +
2728                         fileName + ": " + e);
2729                 }
2730             }
2731         }
2732     }
2733 
enumerateFiles(Set<Long> fileSet)2734     private String enumerateFiles(Set<Long> fileSet) {
2735         StringBuilder sb = new StringBuilder();
2736         for (Long f : fileSet) {
2737             sb.append(" 0x").append(Long.toHexString(f));
2738         };
2739         return sb.toString();
2740     }
2741 
getReplicaCloseCatchupMs()2742     long getReplicaCloseCatchupMs() {
2743         return replicaCloseCatchupMs;
2744     }
2745 
getArbiter()2746     public Arbiter getArbiter() {
2747         return arbiter;
2748     }
2749 
2750     /**
2751      * Shuts down the Network backup service *before* a rollback is initiated
2752      * as part of syncup, thus ensuring that NetworkRestore does not see an
2753      * inconsistent set of log files. Any network backup operations that are in
2754      * progress at this node are aborted. The client of the service will
2755      * experience network connection failures and will retry with this node
2756      * (when the service is re-established at this node), or with some other
2757      * node.
2758      * <p>
2759      * restarNetworkBackup() is then used to restart the service after it was
2760      * shut down.
2761      */
shutdownNetworkBackup()2762     final public void shutdownNetworkBackup() {
2763         logFeederManager.shutdown();
2764         logFeederManager = null;
2765     }
2766 
2767     /**
2768      * Restarts the network backup service *after* a rollback has been
2769      * completed and the log files are once again in a consistent state.
2770      */
restartNetworkBackup()2771     final public void restartNetworkBackup() {
2772         if (logFeederManager != null) {
2773             throw EnvironmentFailureException.unexpectedState(repImpl);
2774         }
2775         logFeederManager=
2776             new com.sleepycat.je.rep.impl.networkRestore.FeederManager
2777             (serviceDispatcher, repImpl, nameIdPair);
2778     }
2779 
2780     /*
2781      * Used to create deliberate clock skews for testing purposes. Replicator
2782      * code should use it instead of invoking System.currentTimeMillis()
2783      * directly.
2784      */
2785     public static class Clock {
2786         private final int skewMs;
2787 
Clock(int skewMs)2788         private Clock(int skewMs) {
2789             this.skewMs = skewMs;
2790         }
2791 
currentTimeMillis()2792         public long currentTimeMillis() {
2793             return System.currentTimeMillis() + skewMs;
2794         }
2795     }
2796 
2797     /**
2798      * Dumps the states associated with any active Feeders as well as
2799      * information pertaining to the group CBVLSN and the composition of the
2800      * group itself.
2801      */
dumpState()2802     public String dumpState() {
2803         return  "\n" + feederManager.dumpState(false /* acksOnly */) +
2804             "\nGlobalCBVLSN=" + getGroupCBVLSN() +
2805             "\n" + getGroup();
2806     }
2807 
2808     /**
2809      * Dumps the state associated with all active Feeders that supply
2810      * acknowledgments.
2811      */
dumpAckFeederState()2812     public String dumpAckFeederState() {
2813         return  "\n" + feederManager.dumpState(true /* acksOnly */) + "\n";
2814     }
2815 
getElectionQuorum()2816     public ElectionQuorum getElectionQuorum() {
2817         return electionQuorum;
2818     }
2819 
getDurabilityQuorum()2820     public DurabilityQuorum getDurabilityQuorum() {
2821         return durabilityQuorum;
2822     }
2823 
setConvertHook(TestHook<Integer> hook)2824     public void setConvertHook(TestHook<Integer> hook) {
2825         if (convertHooks == null) {
2826             convertHooks = new HashSet<TestHook<Integer>>();
2827         }
2828         convertHooks.add(hook);
2829     }
2830 
runConvertHooks()2831     private boolean runConvertHooks () {
2832         if (convertHooks == null) {
2833             return true;
2834         }
2835 
2836         for (TestHook<Integer> h : convertHooks) {
2837             assert TestHookExecute.doHookIfSet(h, 0);
2838         }
2839         return true;
2840     }
2841 
2842     /**
2843      * Get the group minimum JE version.
2844      *
2845      * <p>Returns the minimum JE version that is required for all nodes that
2846      * join this node's replication group.  The version returned is supported
2847      * by all current and future group members.  The minimum JE version is
2848      * guaranteed to only increase over time, so long as the data for the
2849      * environment is not rolled back or lost.
2850      *
2851      * @return the group minimum JE version
2852      */
getMinJEVersion()2853     public JEVersion getMinJEVersion() {
2854         synchronized (minJEVersionLock) {
2855             return group.getMinJEVersion();
2856         }
2857     }
2858 
2859     /**
2860      * Checks if all data nodes in the replication group support the specified
2861      * JE version.  Updates the group minimum JE version, and the group format
2862      * version, as needed to require all nodes joining the group to be running
2863      * at least the specified JE version.
2864      *
2865      * <p>This method should only be called on the master, because attempts to
2866      * update the rep group DB on an replica will fail.
2867      *
2868      * @param newMinJEVersion the new minimum JE version
2869      * @throws DatabaseException if an error occurs when accessing the
2870      * replication group database
2871      * @throws MinJEVersionUnsupportedException if the version is not supported
2872      * by one or more current group members
2873      */
setMinJEVersion(final JEVersion newMinJEVersion)2874     public void setMinJEVersion(final JEVersion newMinJEVersion)
2875         throws MinJEVersionUnsupportedException {
2876 
2877         /*
2878          * Synchronize here on minJEVersionLock to prevent new secondary nodes
2879          * from being added while updating the minimum JE version.  Electable
2880          * nodes are stored in the RepGroupDB, so the check performed on that
2881          * class's setMinJEVersion within a transaction insures that all
2882          * current nodes have been checked before the minimum JE version is
2883          * increased.  But secondary nodes are not stored persistently, so
2884          * other synchronization is needed for them.
2885          */
2886         synchronized (minJEVersionLock) {
2887 
2888             /* Check if at least this version is already required */
2889             final JEVersion groupMinJEVersion = group.getMinJEVersion();
2890             if (groupMinJEVersion.compareTo(newMinJEVersion) >= 0) {
2891                 return;
2892             }
2893 
2894             for (final RepNodeImpl node : group.getDataMembers()) {
2895                 JEVersion nodeJEVersion = node.getJEVersion();
2896                 if (getNodeName().equals(node.getName())) {
2897 
2898                     /* Use the current software version for the local node */
2899                     nodeJEVersion = repImpl.getCurrentJEVersion();
2900                 } else {
2901 
2902                     /* Use the version recorded by the feeder for replicas */
2903                     final Feeder feeder =
2904                         feederManager.getFeeder(node.getName());
2905                     if (feeder != null) {
2906                         final JEVersion currentReplicaJEVersion =
2907                             feeder.getReplicaJEVersion();
2908                         if (currentReplicaJEVersion != null) {
2909                             nodeJEVersion = currentReplicaJEVersion;
2910                         }
2911                     }
2912                 }
2913                 if ((nodeJEVersion == null) ||
2914                     (newMinJEVersion.compareTo(nodeJEVersion) > 0)) {
2915                     throw new MinJEVersionUnsupportedException(
2916                         newMinJEVersion, node.getName(), nodeJEVersion);
2917                 }
2918             }
2919             repGroupDB.setMinJEVersion(newMinJEVersion);
2920         }
2921     }
2922 
2923     /**
2924      * Adds a secondary node to the group.  Assign a node ID and add the node
2925      * to the RepGroupImpl.  Don't notify the monitor: secondary nodes do not
2926      * generate GroupChangeEvents.
2927      *
2928      * @param node the node
2929      * @throws IllegalStateException if the store does not currently support
2930      *         secondary nodes or the node doesn't meet the current minimum JE
2931      *         version
2932      * @throws NodeConflictException if the node conflicts with an existing
2933      *         persistent node
2934      */
addSecondaryNode(final RepNodeImpl node)2935     public void addSecondaryNode(final RepNodeImpl node) {
2936         if (!node.getType().isSecondary()) {
2937             throw new IllegalArgumentException(
2938                 "Attempt to call addSecondaryNode with a" +
2939                 " non-SECONDARY node: " + node);
2940         }
2941         final JEVersion requiredJEVersion =
2942             RepGroupImpl.FORMAT_VERSION_3_JE_VERSION;
2943         try {
2944             setMinJEVersion(requiredJEVersion);
2945         } catch (MinJEVersionUnsupportedException e) {
2946             if (e.nodeVersion == null) {
2947                 throw new IllegalStateException(
2948                     "Secondary nodes are not currently supported." +
2949                     " The version running on node " + e.nodeName +
2950                     " could not be determined," +
2951                     " but this feature requires version " +
2952                     requiredJEVersion.getNumericVersionString() +
2953                     " or later.");
2954             }
2955             throw new IllegalStateException(
2956                 "Secondary nodes are not currently supported." +
2957                 " Node " + e.nodeName + " is running version " +
2958                 e.nodeVersion.getNumericVersionString() +
2959                 ", but this feature requires version " +
2960                 requiredJEVersion.getNumericVersionString() +
2961                 " or later.");
2962         }
2963 
2964         /*
2965          * Synchronize on minJEVersionLock to coordinate with setMinJEVersion
2966          */
2967         synchronized (minJEVersionLock) {
2968             final JEVersion minJEVersion = group.getMinJEVersion();
2969             if (node.getJEVersion().compareTo(minJEVersion) < 0) {
2970                 throw new IllegalStateException(
2971                     "The node does not meet the minimum required version" +
2972                     " for the group." +
2973                     " Node " + node.getNameIdPair().getName() +
2974                     " is running version " + node.getJEVersion() +
2975                     ", but the minimum required version is " +
2976                     minJEVersion);
2977             }
2978             if (!node.getNameIdPair().hasNullId()) {
2979                 throw new IllegalStateException(
2980                     "New secondary node " + node.getNameIdPair().getName() +
2981                     " already has an ID: " + node.getNameIdPair().getId());
2982             }
2983             node.getNameIdPair().setId(secondaryNodeIds.allocateId());
2984             group.addSecondaryNode(node);
2985         }
2986     }
2987 
2988     /**
2989      * Removes a secondary node from the group.  Remove the node from the
2990      * RepGroupImpl and deallocate the node ID.
2991      *
2992      * @param node the node
2993      */
removeSecondaryNode(final RepNodeImpl node)2994     public void removeSecondaryNode(final RepNodeImpl node) {
2995         if (!node.getType().isSecondary()) {
2996             throw new IllegalArgumentException(
2997                 "Attempt to call removeSecondaryNode with a" +
2998                 " non-SECONDARY node: " + node);
2999         }
3000         group.removeSecondaryNode(node);
3001         secondaryNodeIds.deallocateId(node.getNodeId());
3002     }
3003 
3004     /**
3005      * Track node IDs for secondary nodes.  IDs are allocated from the specified
3006      * number of values at the high end of the range of integers.
3007      */
3008     static class SecondaryNodeIds {
3009         private final int size;
3010         private final BitSet bits;
3011 
3012         /** Creates an instance that allocates the specified number of IDs. */
SecondaryNodeIds(final int size)3013         SecondaryNodeIds(final int size) {
3014             this.size = size;
3015             assert size > 0;
3016             bits = new BitSet(size);
3017         }
3018 
3019         /**
3020          * Allocates a free ID, throwing IllegalStateException if none are
3021          * available.
3022          */
allocateId()3023         synchronized int allocateId() {
3024 
3025             /*
3026              * Note that scanning for the next clear bit is somewhat
3027              * inefficient, but this inefficiency shouldn't matter given the
3028              * small number of secondary nodes expected.  If needed, the next
3029              * improvement would probably be to remember the last allocated ID,
3030              * to avoid repeated scans of an initial range of already allocated
3031              * bits.
3032              */
3033             final int pos = bits.nextClearBit(0);
3034             if (pos >= size) {
3035                 throw new IllegalStateException("No more secondary node IDs");
3036             }
3037             bits.set(pos);
3038             return Integer.MAX_VALUE - pos;
3039         }
3040 
3041         /**
3042          * Deallocates a previously allocated ID, throwing
3043          * IllegalArgumentException if the argument was not allocated by
3044          * allocateId or if the ID is not currently allocated.
3045          */
deallocateId(final int id)3046         synchronized void deallocateId(final int id) {
3047             if (id < Integer.MAX_VALUE - size) {
3048                 throw new IllegalArgumentException(
3049                     "Illegal secondary node ID: " + id);
3050             }
3051             final int pos = Integer.MAX_VALUE - id;
3052             if (!bits.get(pos)) {
3053                 throw new IllegalArgumentException(
3054                     "Secondary node ID is not currently allocated: " + id);
3055             }
3056             bits.clear(pos);
3057         }
3058     }
3059 }
3060