1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep.impl.node;
9 
10 import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.N_FEEDERS_CREATED;
11 import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.N_FEEDERS_SHUTDOWN;
12 import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.N_MAX_REPLICA_LAG;
13 import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.N_MAX_REPLICA_LAG_NAME;
14 
15 import java.io.IOException;
16 import java.util.Collections;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.Map;
20 import java.util.Map.Entry;
21 import java.util.Set;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.logging.Logger;
27 
28 import com.sleepycat.je.DatabaseException;
29 import com.sleepycat.je.EnvironmentFailureException;
30 import com.sleepycat.je.StatsConfig;
31 import com.sleepycat.je.rep.UnknownMasterException;
32 import com.sleepycat.je.rep.impl.RepNodeImpl;
33 import com.sleepycat.je.rep.net.DataChannel;
34 import com.sleepycat.je.rep.stream.MasterStatus.MasterSyncException;
35 import com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition;
36 import com.sleepycat.je.rep.utilint.IntRunningTotalStat;
37 import com.sleepycat.je.rep.utilint.LongMaxZeroStat;
38 import com.sleepycat.je.rep.utilint.RepUtils;
39 import com.sleepycat.je.rep.utilint.SizeAwaitMap;
40 import com.sleepycat.je.rep.utilint.SizeAwaitMap.Predicate;
41 import com.sleepycat.je.utilint.IntStat;
42 import com.sleepycat.je.utilint.LoggerUtils;
43 import com.sleepycat.je.utilint.StatGroup;
44 import com.sleepycat.je.utilint.StringStat;
45 import com.sleepycat.je.utilint.VLSN;
46 
47 /**
48  * FeedManager is responsible for the creation and management of the Feeders
49  * used to respond to connections initiated by a Replica. runfeeders() is the
50  * central loop that listens for replica connections and manages the lifecycle
51  * of individual Feeders. It's re-entered each time the node becomes a Master
52  * and is exited when its status changes.
53  *
54  * There is a single instance of FeederManager that is created for a
55  * replication node. There are many instances of Feeders per FeederManager.
56  * Each Feeder instance represents an instance of a connection between the node
57  * serving as the feeder and the replica.
58  *
59  * Note that the FeederManager and the Replica currently reuse the Replication
60  * node's thread of control. When we implement r2r we will need to revisit the
61  * thread management to provide for concurrent operation of the FeederManger
62  * and the Replica.
63  */
64 final public class FeederManager {
65 
66     private final RepNode repNode;
67 
68     /*
69      * The queue into which the ServiceDispatcher queues socket channels for
70      * new Feeder instances.
71      */
72     private final BlockingQueue<DataChannel> channelQueue =
73         new LinkedBlockingQueue<DataChannel>();
74 
75     /*
76      * Feeders are stored in either nascentFeeders or activeFeeders, and not
77      * both.  To avoid deadlock, if locking both collections, lock
78      * nascentFeeders first and then activeFeeders.
79      */
80 
81     /*
82      * Nascent feeders that are starting up and are not yet active. They have
83      * network connections but have not synched up or completed handshakes.
84      * They are moved into the feeder map, once they become active.
85      */
86     private final Set<Feeder> nascentFeeders =
87         Collections.synchronizedSet(new HashSet<Feeder>());
88 
89     /*
90      * The collection of active feeders currently feeding replicas. The map is
91      * indexed by the Replica's node name. Access to this map must be
92      * synchronized, since it's updated concurrently by the Feeders that share
93      * it.
94      *
95      * A feeder is considered to be active after it has completed the handshake
96      * sequence with its associated Replica.
97      *
98      * Note that the SizeAwaitMap will only wait for feeders that are connected
99      * to electable replicas, since those are the only ones participating in
100      * durability decisions.
101      */
102     private final SizeAwaitMap<String, Feeder> activeFeeders;
103 
104     /*
105      * A test delay introduced in the feeder loop to simulate a loaded master.
106      * The feeder waits this amount of time after each message is sent.
107      */
108     private int testDelayMs=0;
109 
110     /* Set to true to force a shutdown of the FeederManager. */
111     AtomicBoolean shutdown = new AtomicBoolean(false);
112 
113     /*
114      * Non null if the replication node must be shutdown as well. This is
115      * typically the result of an unexpected exception in the feeder.
116      */
117     private RuntimeException repNodeShutdownException;
118     private final Logger logger;
119 
120     /* FeederManager statistics. */
121     private final StatGroup stats;
122     private final IntStat nFeedersCreated;
123     private final IntStat nFeedersShutdown;
124 
125     /*
126      * The maximum lag across all replicas. Atomic values or synchronization
127      * are not used for the shared statistic to minimize overheads and the
128      * resulting occasional inaccuracy in the statics is an acceptable
129      * tradeoff.
130      */
131     private final LongMaxZeroStat nMaxReplicaLag;
132     private final StringStat nMaxReplicaLagName;
133 
134     /* The frequency with which the Feeder checks for a master change */
135     public static final int MASTER_CHANGE_CHECK_TIMEOUT = 1000;
136 
137     /* Identifies the Feeder Service. */
138     public static final String FEEDER_SERVICE = "Feeder";
139 
FeederManager(RepNode repNode)140     FeederManager(RepNode repNode) {
141         this.repNode = repNode;
142         activeFeeders = new SizeAwaitMap<String, Feeder>(
143             repNode.getRepImpl(), new MatchElectableFeeders());
144         logger = LoggerUtils.getLogger(getClass());
145         stats = new StatGroup(FeederManagerStatDefinition.GROUP_NAME,
146                               FeederManagerStatDefinition.GROUP_DESC);
147         nFeedersCreated = new IntRunningTotalStat(stats, N_FEEDERS_CREATED);
148         nFeedersShutdown = new IntRunningTotalStat(stats, N_FEEDERS_SHUTDOWN);
149         nMaxReplicaLag = new LongMaxZeroStat(stats, N_MAX_REPLICA_LAG);
150         nMaxReplicaLagName = new StringStat(stats, N_MAX_REPLICA_LAG_NAME);
151     }
152 
153     /**
154      * A SizeAwaitMap predicate that matches feeders connected to electable
155      * replicas.
156      */
157     private class MatchElectableFeeders implements Predicate<Feeder> {
158         @Override
match(final Feeder value)159         public boolean match(final Feeder value) {
160 
161             /* The replica node might be null during unit testing */
162             final RepNodeImpl replica = value.getReplicaNode();
163             return (replica != null) &&
164                 repNode.getDurabilityQuorum().replicaAcksQualify(replica);
165         }
166     }
167 
168     /**
169      * Returns the statistics associated with the FeederManager.
170      *
171      * @return the statistics
172      */
getFeederManagerStats(StatsConfig config)173     public StatGroup getFeederManagerStats(StatsConfig config) {
174 
175         synchronized (stats) {
176             return stats.cloneGroup(config.getClear());
177         }
178     }
179 
180     /* Get the protocol stats for this FeederManager. */
getProtocolStats(StatsConfig config)181     public StatGroup getProtocolStats(StatsConfig config) {
182         /* Aggregate stats that have not yet been aggregated. */
183         StatGroup protocolStats =
184             new StatGroup(BinaryProtocolStatDefinition.GROUP_NAME,
185                           BinaryProtocolStatDefinition.GROUP_DESC);
186         synchronized (activeFeeders) {
187             for (Feeder feeder : activeFeeders.values()) {
188                 protocolStats.addAll(feeder.getProtocolStats(config));
189             }
190         }
191 
192         return protocolStats;
193     }
194 
195     /* Reset the feeders' stats of this FeederManager. */
resetStats()196     public void resetStats() {
197         synchronized (stats) {
198             stats.clear();
199         }
200         synchronized (activeFeeders) {
201             for (Feeder feeder : activeFeeders.values()) {
202                 feeder.resetStats();
203             }
204         }
205     }
206 
207     /**
208      * Accumulates statistics from a terminating feeder.
209      * @param stats
210      */
incStats(StatGroup feederStats)211     void incStats(StatGroup feederStats) {
212         synchronized (stats) {
213             stats.addAll(feederStats);
214         }
215     }
216 
getTestDelayMs()217     public int getTestDelayMs() {
218         return testDelayMs;
219     }
220 
setTestDelayMs(int testDelayMs)221     public void setTestDelayMs(int testDelayMs) {
222         this.testDelayMs = testDelayMs;
223     }
224 
225     /**
226      * Returns the RepNode associated with the FeederManager
227      * @return
228      */
repNode()229     RepNode repNode() {
230         return repNode;
231     }
232 
233     /**
234      * Returns the Feeder associated with the node, if such a feeder is
235      * currently active.
236      */
getFeeder(String nodeName)237     public Feeder getFeeder(String nodeName) {
238         return activeFeeders.get(nodeName);
239     }
240 
241     /*
242      * For test use only.
243      */
putFeeder(String nodeName, Feeder feeder)244     public Feeder putFeeder(String nodeName, Feeder feeder) {
245         return activeFeeders.put(nodeName, feeder);
246     }
247 
getnMaxReplicaLag()248     public LongMaxZeroStat getnMaxReplicaLag() {
249         return nMaxReplicaLag;
250     }
251 
getnMaxReplicaLagName()252     public StringStat getnMaxReplicaLagName() {
253         return nMaxReplicaLagName;
254     }
255 
setRepNodeShutdownException(RuntimeException rNSE)256     void setRepNodeShutdownException(RuntimeException rNSE) {
257         this.repNodeShutdownException = rNSE;
258     }
259 
260     /**
261      * The numbers of Replicas currently "active" with this feeder. Active
262      * currently means they are connected. It does not make any guarantees
263      * about where they are in the replication stream. They may, for example,
264      * be too far behind to participate in timely acks.
265      *
266      * @return the active replica count
267      */
activeReplicaCount()268     public int activeReplicaCount() {
269         return activeFeeders.size();
270     }
271 
272     /**
273      * Returns the set of Replicas that are currently active with this feeder.
274      * A replica is active if it has completed the handshake sequence.
275      *
276      * @return the set of replica node names
277      */
activeReplicas()278     public Set<String> activeReplicas() {
279         synchronized (activeFeeders) {
280 
281             /*
282              * Create a copy to avoid inadvertent concurrency conflicts,
283              * since the keySet is a view of the underlying map.
284              */
285             return new HashSet<String>(activeFeeders.keySet());
286         }
287     }
288 
289     /**
290      * Returns the set of replicas that are currently active with this feeder
291      * and that supply acknowledgments.  A replica is active if it has
292      * completed the handshake sequence.
293      *
294      * @return the set of replica node names
295      */
activeAckReplicas()296     public Set<String> activeAckReplicas() {
297         final Set<String> nodeNames = new HashSet<String>();
298         synchronized (activeFeeders) {
299             for (final Entry<String, Feeder> entry :
300                      activeFeeders.entrySet()) {
301                 final Feeder feeder = entry.getValue();
302 
303                 /* The replica node should be non-null for an active feeder */
304                 final RepNodeImpl replica = feeder.getReplicaNode();
305                 if (replica.getType().isElectable()) {
306                     final String nodeName = entry.getKey();
307                     nodeNames.add(nodeName);
308                 }
309             }
310         }
311         return nodeNames;
312     }
313 
activeReplicasMap()314     public Map<String, Feeder> activeReplicasMap() {
315         synchronized (activeFeeders){
316             return new HashMap<String, Feeder>(activeFeeders);
317         }
318     }
319 
320     /**
321      * Transitions a Feeder to being active, so that it can be used in
322      * considerations relating to commit acknowledgments and decisions about
323      * choosing feeders related to system load.
324      *
325      * @param feeder the feeder being transitioned.
326      *
327      * @throws DuplicateReplicaException if the Feeder is already active
328      */
activateFeeder(Feeder feeder)329     void activateFeeder(Feeder feeder) {
330         synchronized (nascentFeeders) {
331             synchronized (activeFeeders) {
332                 boolean removed = nascentFeeders.remove(feeder);
333                 if (feeder.isShutdown()) {
334                     return;
335                 }
336                 assert(removed);
337                 String replicaName = feeder.getReplicaNameIdPair().getName();
338                 assert(!feeder.getReplicaNameIdPair().equals(NameIdPair.NULL));
339                 Feeder dup = activeFeeders.get(replicaName);
340                 if ((dup != null) && !dup.isShutdown()) {
341                     throw EnvironmentFailureException.
342                         unexpectedState(repNode.getRepImpl(),
343                                         feeder.getReplicaNameIdPair() +
344                                         " is present in both nascent and " +
345                                         "active feeder sets");
346                 }
347                 activeFeeders.put(replicaName, feeder);
348 
349                 MasterTransfer xfr = repNode.getActiveTransfer();
350                 if (xfr != null) {
351                     xfr.addFeeder(feeder);
352                 }
353             }
354         }
355     }
356 
357     /**
358      * Remove the feeder from the sets used to track it. Invoked when a feeder
359      * is shutdown.
360      *
361      * @param feeder
362      */
removeFeeder(Feeder feeder)363     void removeFeeder(Feeder feeder) {
364         assert(feeder.isShutdown());
365         final String replicaName = feeder.getReplicaNameIdPair().getName();
366         synchronized (nascentFeeders) {
367             synchronized (activeFeeders) {
368                 nascentFeeders.remove(feeder);
369                 activeFeeders.remove(replicaName);
370             }
371         }
372 
373         final RepNodeImpl node = feeder.getReplicaNode();
374         if ((node != null) && node.getType().isSecondary()) {
375             repNode.removeSecondaryNode(node);
376         }
377     }
378 
379     /**
380      * Clears and shuts down the runFeeders by inserting a special EOF marker
381      * value into the queue.
382      */
shutdownQueue()383     void shutdownQueue() {
384         if (!repNode.isShutdown()) {
385             throw EnvironmentFailureException.unexpectedState
386                 ("Rep node is still active");
387         }
388         channelQueue.clear();
389         /* Add special entry so that the channelQueue.poll operation exits. */
390         channelQueue.add(RepUtils.CHANNEL_EOF_MARKER);
391     }
392 
393     /**
394      * The core feeder listener loop that is run either in a Master node, or in
395      * a Replica that is serving as a Feeder to other Replica nodes. The core
396      * loop accepts connections from Replicas as they come in and establishes a
397      * Feeder on that connection.
398      *
399      * The loop can be terminated for one of the following reasons:
400      *
401      *  1) A change in Masters.
402      *
403      *  2) A forced shutdown, via a thread interrupt.
404      *
405      *  3) A server socket level exception.
406      *
407      * The timeout on the accept is used to ensure that the check is done at
408      * least once per timeout period.
409      */
runFeeders()410     void runFeeders()
411         throws DatabaseException {
412 
413         if (shutdown.get()) {
414             throw EnvironmentFailureException.unexpectedState
415                 ("Feeder manager was shutdown");
416         }
417         Exception feederShutdownException = null;
418         LoggerUtils.info(logger, repNode.getRepImpl(),
419                          "Feeder manager accepting requests.");
420 
421         /* This updater represents the masters's local cbvlsn, which the master
422            updates directly. */
423         final LocalCBVLSNUpdater updater = new LocalCBVLSNUpdater(
424             repNode.getNameIdPair(), repNode.getNodeType(), repNode);
425         final LocalCBVLSNTracker tracker = repNode.getCBVLSNTracker();
426 
427         try {
428             /*
429              * Ensure that the Global CBVLSN is initialized for the master when
430              * it first comes up; it's subsequently maintained in the loop
431              * below.
432              */
433             updater.updateForMaster(tracker);
434 
435             repNode.getServiceDispatcher().
436                 register(FEEDER_SERVICE, channelQueue);
437 
438             /*
439              * The Feeder is ready for business, indicate that the node is
440              * ready by counting down the latch and releasing any waiters.
441              */
442             repNode.getReadyLatch().countDown();
443 
444             while (true) {
445                 final DataChannel feederReplicaChannel =
446                     channelQueue.poll(MASTER_CHANGE_CHECK_TIMEOUT,
447                                       TimeUnit.MILLISECONDS);
448 
449                 if (feederReplicaChannel == RepUtils.CHANNEL_EOF_MARKER) {
450                     LoggerUtils.info(logger, repNode.getRepImpl(),
451                                      "Feeder manager soft shutdown.");
452                     return;
453                 }
454 
455                 repNode.getMasterStatus().assertSync();
456                 if (feederReplicaChannel == null) {
457                     if (repNode.isShutdownOrInvalid()) {
458                         /* Timeout and shutdown request */
459                         LoggerUtils.info(logger, repNode.getRepImpl(),
460                                          "Feeder manager forced shutdown.");
461                         return;
462                     }
463 
464                     /*
465                      * Take this opportunity to update this node's CBVLSN The
466                      * replicas are sending in their CBVLSNs through the
467                      * heartbeat responses, but a master does not send any
468                      * heartbeat responses, and needs a different path to
469                      * update its local CBVLSN.
470                      */
471                     updater.updateForMaster(tracker);
472                     continue;
473                 }
474 
475                 nFeedersCreated.increment();
476                 try {
477                     Feeder feeder = new Feeder(this, feederReplicaChannel);
478                     nascentFeeders.add(feeder);
479                     feeder.startFeederThreads();
480                 } catch (IOException e) {
481 
482                     /*
483                      * Indicates a feeder socket level exception.
484                      */
485                     LoggerUtils.fine
486                         (logger, repNode.getRepImpl(),
487                          "Feeder I/O exception: " + e.getMessage());
488                     try {
489                         feederReplicaChannel.close();
490                     } catch (IOException e1) {
491                         LoggerUtils.fine
492                             (logger, repNode.getRepImpl(),
493                              "Exception during cleanup." + e.getMessage());
494                     }
495                     continue;
496                 }
497             }
498         } catch (MasterSyncException e) {
499             LoggerUtils.info(logger, repNode.getRepImpl(),
500                              "Master change: " + e.getMessage());
501 
502             feederShutdownException = new UnknownMasterException("Node " +
503                                 repNode.getRepImpl().getName() +
504                                 " is not a master anymore");
505         } catch (InterruptedException e) {
506             if (this.repNodeShutdownException != null) {
507 
508                 /*
509                  * The interrupt was issued to propagate an exception from one
510                  * of the Feeder threads. It's not a normal exit.
511                  */
512                 LoggerUtils.warning(logger, repNode.getRepImpl(),
513                                     "Feeder manager unexpected interrupt");
514                 throw repNodeShutdownException; /* Terminate the rep node */
515             }
516             if (repNode.isShutdown()) {
517                 LoggerUtils.info(logger, repNode.getRepImpl(),
518                                  "Feeder manager interrupted for shutdown");
519                 return;
520             }
521             feederShutdownException = e;
522             LoggerUtils.warning(logger, repNode.getRepImpl(),
523                                 "Feeder manager unexpected interrupt");
524         } finally {
525             repNode.resetReadyLatch(feederShutdownException);
526             repNode.getServiceDispatcher().cancel(FEEDER_SERVICE);
527             shutdownFeeders(feederShutdownException);
528             LoggerUtils.info(logger, repNode.getRepImpl(),
529                              "Feeder manager exited. CurrentTxnEnd VLSN: " +
530                              repNode.getCurrentTxnEndVLSN());
531         }
532     }
533 
534     /**
535      * Shuts down all the feeders managed by the FeederManager
536      *
537      * @param feederShutdownException the exception provoking the shutdown.
538      */
shutdownFeeders(Exception feederShutdownException)539     private void shutdownFeeders(Exception feederShutdownException) {
540 
541         boolean changed = shutdown.compareAndSet(false, true);
542         if (!changed) {
543             return;
544         }
545 
546         try {
547             /* Copy sets for safe iteration in the presence of deletes.*/
548             final Set<Feeder> feederSet;
549             synchronized (nascentFeeders) {
550                 synchronized (activeFeeders) {
551                     feederSet = new HashSet<Feeder>(activeFeeders.values());
552                     feederSet.addAll(nascentFeeders);
553                 }
554             }
555 
556             for (Feeder feeder : feederSet) {
557                 nFeedersShutdown.increment();
558                 feeder.shutdown(feederShutdownException);
559             }
560         } finally {
561             if (feederShutdownException == null) {
562                 feederShutdownException =
563                     new IllegalStateException("FeederManager shutdown");
564             }
565             activeFeeders.clear(feederShutdownException);
566             nascentFeeders.clear();
567         }
568     }
569 
570     /**
571      * Shuts down a specific feeder. It's typically done in response to the
572      * removal of a member from the group.
573      */
shutdownFeeder(RepNodeImpl node)574     public void shutdownFeeder(RepNodeImpl node) {
575         Feeder feeder = activeFeeders.get(node.getName());
576         if (feeder == null) {
577             return;
578         }
579         nFeedersShutdown.increment();
580         feeder.shutdown(null);
581     }
582 
583     /**
584      * Block until the required number of electable feeders/replica connections
585      * are established. Used for establishing durability quorums. Since this is
586      * counting feeder/replica connections, requiredReplicaCount does not
587      * include the master.
588      */
awaitFeederReplicaConnections( int requiredReplicaCount, long insufficientReplicasTimeout)589     public boolean awaitFeederReplicaConnections(
590         int requiredReplicaCount, long insufficientReplicasTimeout)
591         throws InterruptedException {
592 
593         return activeFeeders.sizeAwait(requiredReplicaCount,
594                                        insufficientReplicasTimeout,
595                                        TimeUnit.MILLISECONDS);
596     }
597 
598     /*
599      * For debugging help, and for expanded exception messages, dump feeder
600      * related state.  If acksOnly is true, only include information about
601      * feeders for replicas that supply acknowledgments.
602      */
dumpState(final boolean acksOnly)603     public String dumpState(final boolean acksOnly) {
604         StringBuilder sb = new StringBuilder();
605         synchronized (activeFeeders) {
606             Set<Map.Entry<String, Feeder>> feeds = activeFeeders.entrySet();
607             if (feeds.size() == 0) {
608                 sb.append("No feeders.");
609             } else {
610                 sb.append("Current feeds:");
611                 for (Map.Entry<String, Feeder> feedEntry : feeds) {
612                     final Feeder feeder = feedEntry.getValue();
613 
614                     /*
615                      * Ignore secondary nodes if only want nodes that provide
616                      * acknowledgments
617                      */
618                     if (acksOnly &&
619                         feeder.getReplicaNode().getType().isSecondary()) {
620                         continue;
621                     }
622                     sb.append("\n ").append(feedEntry.getKey()).append(": ");
623                     sb.append(feeder.dumpState());
624                 }
625             }
626         }
627         return sb.toString();
628     }
629 
630     /**
631      * Returns a count of the number of feeders whose replicas are counted in
632      * durability decisions and have acknowledged txn-end VLSNs >= the
633      * commitVLSN argument.
634      *
635      * @param commitVLSN the commitVLSN being checked
636      */
getNumCurrentAckFeeders(VLSN commitVLSN)637     public int getNumCurrentAckFeeders(VLSN commitVLSN) {
638         final DurabilityQuorum durabilityQuorum =
639             repNode.getDurabilityQuorum();
640         int count = 0;
641         synchronized (activeFeeders) {
642             for (Feeder feeder : activeFeeders.values()) {
643                 if ((commitVLSN.compareTo(feeder.getReplicaTxnEndVLSN()) <= 0)
644                     && durabilityQuorum.replicaAcksQualify(
645                         feeder.getReplicaNode())) {
646                     count++;
647                 }
648             }
649             return count;
650         }
651     }
652 }
653