1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep.elections;
9 
10 import static com.sleepycat.je.rep.elections.ProposerStatDefinition.PHASE1_NO_NON_ZERO_PRIO;
11 import static com.sleepycat.je.rep.elections.ProposerStatDefinition.PHASE1_NO_QUORUM;
12 import static com.sleepycat.je.rep.elections.ProposerStatDefinition.PROMISE_COUNT;
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.assertFalse;
15 
16 import java.io.IOException;
17 import java.util.LinkedList;
18 import java.util.List;
19 import java.util.concurrent.CountDownLatch;
20 
21 import org.junit.After;
22 import org.junit.Before;
23 import org.junit.Test;
24 
25 import com.sleepycat.je.rep.QuorumPolicy;
26 import com.sleepycat.je.rep.ReplicationNetworkConfig;
27 import com.sleepycat.je.rep.ReplicationConfig;
28 import com.sleepycat.je.rep.arbitration.Arbiter;
29 import com.sleepycat.je.rep.elections.Acceptor.SuggestionGenerator;
30 import com.sleepycat.je.rep.elections.Proposer.Proposal;
31 import com.sleepycat.je.rep.elections.Protocol.Value;
32 import com.sleepycat.je.rep.impl.RepGroupImpl;
33 import com.sleepycat.je.rep.impl.RepNodeImpl;
34 import com.sleepycat.je.rep.impl.node.ElectionQuorum;
35 import com.sleepycat.je.rep.impl.node.NameIdPair;
36 import com.sleepycat.je.rep.impl.node.RepNode;
37 import com.sleepycat.je.rep.net.DataChannelFactory;
38 import com.sleepycat.je.rep.utilint.RepTestUtils;
39 import com.sleepycat.je.rep.utilint.ServiceDispatcher;
40 import com.sleepycat.je.rep.utilint.net.DataChannelFactoryBuilder;
41 import com.sleepycat.util.test.TestBase;
42 
43 /**
44  * Tests for elections as a whole.
45  */
46 public class ElectionsTest extends TestBase {
47 
48     /* Number of nodes in the test */
49     private static final int nodes = 3;
50     private static final int monitors = 1;
51     private int nretries;
52 
53     private final Object notificationsLock = new Object();
54     private int listenerNotifications = 0;
55 
56     private final ReplicationConfig repConfig[] =
57         new ReplicationConfig[nodes + 1];
58     // private Monitor monitor;
59     private boolean monitorInvoked = false;
60 
61     private final List<Elections> electionNodes = new LinkedList<Elections>();
62     private MasterValue winningValue = null;
63 
64     /* Latch to ensure that required all listeners have made it through. */
65     CountDownLatch listenerLatch;
66 
67     private RepGroupImpl repGroup = null;
68 
69     @Override
70     @Before
setUp()71     public void setUp() throws IOException {
72         repGroup = RepTestUtils.createTestRepGroup(nodes, monitors);
73         for (RepNodeImpl rn : repGroup.getAllElectableMembers()) {
74             ReplicationConfig config = new ReplicationConfig();
75             config.setRepNetConfig(
76                 ReplicationNetworkConfig.create(
77                     RepTestUtils.readNetProps()));
78             repConfig[rn.getNodeId()] = config;
79             config.setNodeName(rn.getName());
80             config.setNodeHostPort(rn.getHostName()+ ":" +rn.getPort());
81         }
82     }
83 
84     @Override
85     @After
tearDown()86     public void tearDown() throws Exception {
87         if ((electionNodes != null) && (electionNodes.size() > 0)) {
88             electionNodes.get(0).shutdownAcceptorsLearners(
89                 repGroup.getAllAcceptorSockets(),
90                 repGroup.getAllHelperSockets());
91 
92             for (Elections node : electionNodes) {
93                 node.getServiceDispatcher().shutdown();
94             }
95         }
96     }
97 
98     /**
99      * Simulates the start up of the first "n" nodes. If < n nodes are started,
100      * the others simulate being down.
101      *
102      * @param nstart nodes to start up
103      * @param groupSize the size of the group
104      * @throws IOException
105      */
startReplicationNodes(final int nstart, final int groupSize, final boolean testPriority)106     public void startReplicationNodes(final int nstart,
107                                       final int groupSize,
108                                       final boolean testPriority)
109         throws IOException {
110 
111         for (short nodeNum = 1; nodeNum <= nstart; nodeNum++) {
112             Elections elections =
113                 new Elections(newRepNode(groupSize, nodeNum, testPriority),
114                               new TestListener(),
115                               newSuggestionGenerator(nodeNum, testPriority));
116             elections.getRepNode().getServiceDispatcher().start();
117             elections.startLearner();
118             elections.participate();
119             electionNodes.add(elections);
120             elections.updateRepGroup(repGroup);
121         }
122 
123         // Start up the Monitor as well.
124         /*
125         InetSocketAddress monitorSocket =
126             repGroup.getMonitors().iterator().next().getLearnerSocket();
127         monitor = new Monitor(repConfig[1].getGroupName(),
128                               monitorSocket,
129                               repGroup);
130         monitor.setMonitorChangeListener(new MonitorChangeListener() {
131             @Override
132             public void replicationChange(MonitorChangeEvent monitorChangeEvent) {
133                 monitorInvoked = true;
134                 assertEquals(winningValue.getMasterNodeId(),
135                         ((NewMasterEvent) monitorChangeEvent).getMasterId());
136             }
137         });
138         monitor.startMonitor();
139         */
140     }
141 
startReplicationNodes(final int nstart, final int groupSize)142     public void startReplicationNodes(final int nstart,
143                                       final int groupSize)
144         throws IOException {
145             startReplicationNodes(nstart, groupSize, false);
146     }
147 
newRepNode(final int groupSize, final short nodeNum, final boolean testPriority)148     private RepNode newRepNode(final int groupSize,
149                                final short nodeNum,
150                                final boolean testPriority)
151         throws IOException {
152 
153         final DataChannelFactory channelFactory =
154             DataChannelFactoryBuilder.construct(
155                 repConfig[nodeNum].getRepNetConfig(),
156                 repConfig[nodeNum].getNodeName());
157         final ServiceDispatcher serviceDispatcher =
158             new ServiceDispatcher(repConfig[nodeNum].getNodeSocketAddress(),
159                                   channelFactory);
160 
161         return new RepNode(new NameIdPair(repConfig[nodeNum].getNodeName(),
162                                           nodeNum),
163                                           serviceDispatcher) {
164             @Override
165             public ElectionQuorum getElectionQuorum() {
166                 return new ElectionQuorum() {
167 
168                     @Override
169                     public boolean haveQuorum(QuorumPolicy quorumPolicy,
170                                               int votes) {
171                         return votes >= quorumPolicy.quorumSize(groupSize);
172                     }
173                 };
174             }
175 
176             @Override
177             public int getElectionPriority() {
178                 return testPriority ? (groupSize - nodeNum + 1) :
179                     repConfig[nodeNum].getNodePriority();
180             }
181 
182             /**
183              * This faked out test node never really does arbitration, but
184              * needs to be able to return an Arbiter instance
185              * for election retries.
186              */
187             @Override
188             public Arbiter getArbiter() {
189                 return new Arbiter(null) {
190                    @Override
191                     public synchronized boolean activateArbitration() {
192                         return false;
193                     }
194                 };
195             }
196         };
197     }
198 
199     private SuggestionGenerator newSuggestionGenerator(final short nodeNum,
200                                                        final boolean testPriority) {
201         return new Acceptor.SuggestionGenerator() {
202             @Override
203             @SuppressWarnings("unused")
204             public Value get(Proposal proposal) {
205                 return new MasterValue("testhost", 9999,
206                                        new NameIdPair("n" + nodeNum,
207                                                       nodeNum));
208             }
209 
210             @Override
211             @SuppressWarnings("unused")
212             public long getRanking(Proposal proposal) {
213                 return testPriority ? 1000l : nodeNum * 10l;
214             }
215         };
216     }
217 
218     public void startReplicationNodes(int nstart)
219         throws IOException {
220         startReplicationNodes(nstart, nstart);
221     }
222 
223     class TestListener implements Learner.Listener {
224 
225         @Override
226         @SuppressWarnings("unused")
227         public void notify(Proposal proposal, Value value) {
228             synchronized (notificationsLock) {
229                 listenerNotifications++;
230             }
231             assertEquals(winningValue, value);
232             listenerLatch.countDown();
233         }
234     }
235 
236     private Elections setupAndRunElection(QuorumPolicy qpolicy,
237                                           int activeNodes,
238                                           int groupSize)
239             throws IOException, InterruptedException {
240 
241         /* Start all of them. */
242         startReplicationNodes(activeNodes, groupSize);
243         winningValue = new MasterValue("testhost", 9999,
244                                        new NameIdPair("n" + (activeNodes),
245                                                       (activeNodes)));
246         return runElection(qpolicy, activeNodes);
247     }
248 
249     private Elections setupAndRunElection(int activeNodes) throws IOException,
250             InterruptedException {
251         return setupAndRunElection(QuorumPolicy.SIMPLE_MAJORITY,
252                                    activeNodes,
253                                    activeNodes);
254     }
255 
256     private Elections setupAndRunElection(int activeNodes, int groupSize)
257         throws IOException, InterruptedException {
258         return setupAndRunElection(QuorumPolicy.SIMPLE_MAJORITY,
259                                    activeNodes,
260                                    groupSize);
261     }
262 
263     private Elections runElection(QuorumPolicy qpolicy, int activeNodes)
264             throws InterruptedException {
265         listenerNotifications = 0;
266         monitorInvoked = false;
267         nretries = 2;
268         listenerLatch = new CountDownLatch(activeNodes);
269         /* Initiate an election on the first node. */
270         Elections testElections = electionNodes.iterator().next();
271 
272         testElections.initiateElection(repGroup, qpolicy, nretries);
273         /* Ensure that Proposer has finished. */
274         testElections.waitForElection();
275         return testElections;
276     }
277 
278     private Elections runElection(int activeNodes)
279         throws InterruptedException {
280 
281         return runElection(QuorumPolicy.SIMPLE_MAJORITY, activeNodes);
282     }
283 
284     /**
285      * Simulates presence of a simple majority, but with prio zero nodes
286      */
287     @Test
288     public void testBasicZeroPrio() throws InterruptedException,
289             IOException {
290 
291         /* Elections with a mix of zero and non-zero prio nodes. */
292         final int majority = (nodes/2);
293         /* Have the first < majority nodes be zero prio. */
294 
295         for (int i=1; i < nodes; i++) {
296             repConfig[i].setNodePriority(0);
297         }
298         setupAndRunElection(nodes);
299         listenerLatch.await();
300         assertEquals(nodes, listenerNotifications);
301 
302         /* Now remove all non-zero prio nodes and try hold an election. */
303 
304         // Now remove one node and the elections should give up after
305         // retries have expired.
306         electionNodes.get(nodes-1).getAcceptor().shutdown();
307         Elections testElections = runElection(nodes);
308         /* No successful elections, hence no notification. */
309         assertEquals(0, listenerNotifications);
310 
311         /* Ensure that all retries were due to lack of a Quorum. */
312         assertEquals
313             (nretries, testElections.getStats().getInt(PHASE1_NO_NON_ZERO_PRIO));
314     }
315 
316     /**
317      * Tests a basic election with everything being normal.
318      */
319     @Test
320     public void testBasicAllNodes()
321         throws InterruptedException, IOException {
322 
323         /* Start all of them. */
324         setupAndRunElection(nodes);
325         listenerLatch.await();
326 
327         assertEquals(nodes, listenerNotifications);
328         // assertTrue(monitorInvoked);
329         runElection(nodes);
330         listenerLatch.await();
331         assertEquals(nodes, listenerNotifications);
332         assertFalse(monitorInvoked);
333     }
334 
335     @Test
336     public void testBasicAllPrioNodes()
337         throws InterruptedException, IOException {
338 
339         /* Start all of them. */
340         startReplicationNodes(nodes, nodes, true);
341         winningValue = new MasterValue("testhost", 9999,
342                                        new NameIdPair("n1", 1));
343         runElection(QuorumPolicy.SIMPLE_MAJORITY, nodes);
344         listenerLatch.await();
345 
346         assertEquals(nodes, listenerNotifications);
347         // assertTrue(monitorInvoked);
348         runElection(nodes);
349         listenerLatch.await();
350         assertEquals(nodes, listenerNotifications);
351         assertFalse(monitorInvoked);
352     }
353 
354     /**
355      * Simulates one node never having come up.
356      */
357     @Test
358     public void testBasicAllButOneNode() throws InterruptedException,
359             IOException {
360         /*
361          * Simulate one node down at startup, but sufficient nodes for a quorum.
362          */
363         setupAndRunElection(nodes - 1);
364         listenerLatch.await();
365         assertEquals(nodes - 1, listenerNotifications);
366         // assertTrue(monitorInvoked);
367     }
368 
369     /**
370      * Tests a basic election with one node having crashed.
371      */
372     @Test
373     public void testBasicOneNodeCrash() throws InterruptedException,
374             IOException {
375         /* Start all of them. */
376         Elections testElections = setupAndRunElection(nodes);
377         listenerLatch.await();
378 
379         assertEquals(nodes, listenerNotifications);
380         // assertTrue(monitorInvoked);
381         assertEquals(nodes, testElections.getStats().getInt(PROMISE_COUNT));
382         electionNodes.get(0).getAcceptor().shutdown();
383         testElections = runElection(nodes);
384         listenerLatch.await();
385         /* The listener should have still obtained a notification. */
386         assertEquals(nodes, listenerNotifications);
387         /* Master unchanged so monitor not invoked */
388         assertFalse(monitorInvoked);
389         assertEquals(nodes - 1, testElections.getStats().getInt(PROMISE_COUNT));
390     }
391 
392     /**
393      * Tests a QuorumPolicy of ALL.
394      */
395     @Test
396     public void testQuorumPolicyAll() throws InterruptedException, IOException {
397 
398         /* Start all of them. */
399         Elections testElections =
400             setupAndRunElection(QuorumPolicy.ALL, nodes, nodes);
401         listenerLatch.await();
402 
403         assertEquals(nodes, listenerNotifications);
404         // assertTrue(monitorInvoked);
405         assertEquals(nodes, testElections.getStats().getInt(PROMISE_COUNT));
406 
407         // Now remove one node and the elections should give up after
408         // retries have expired.
409         electionNodes.get(0).getAcceptor().shutdown();
410         testElections = runElection(QuorumPolicy.ALL, nodes);
411 
412         assertEquals(0, listenerNotifications);
413         assertFalse(monitorInvoked);
414 
415         /* Ensure that all retries were due to lack of a Quorum. */
416         assertEquals
417             (nretries, testElections.getStats().getInt(PHASE1_NO_QUORUM));
418     }
419 
420     /**
421      * Tests the case where a quorum could not be reached.
422      *
423      * @throws IOException
424      * @throws InterruptedException
425      */
426     @Test
427     public void testNoQuorum() throws IOException, InterruptedException {
428 
429         Elections testElections = setupAndRunElection(nodes/2, nodes);
430         /*
431          * No listeners were invoked so don't wait for a latch. No quorum,
432          * therefore no listener invocations.
433          */
434         assertEquals(0, listenerNotifications);
435         assertFalse(monitorInvoked);
436         /* No listeners were invoked. */
437         assertEquals(nodes / 2, listenerLatch.getCount());
438         /* Ensure that all retries were due to lack of a Quorum. */
439         assertEquals
440             (nretries, testElections.getStats().getInt(PHASE1_NO_QUORUM));
441     }
442 }
443