1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.elections; 9 10 import static com.sleepycat.je.rep.elections.ProposerStatDefinition.PHASE1_NO_NON_ZERO_PRIO; 11 import static com.sleepycat.je.rep.elections.ProposerStatDefinition.PHASE1_NO_QUORUM; 12 import static com.sleepycat.je.rep.elections.ProposerStatDefinition.PROMISE_COUNT; 13 import static org.junit.Assert.assertEquals; 14 import static org.junit.Assert.assertFalse; 15 16 import java.io.IOException; 17 import java.util.LinkedList; 18 import java.util.List; 19 import java.util.concurrent.CountDownLatch; 20 21 import org.junit.After; 22 import org.junit.Before; 23 import org.junit.Test; 24 25 import com.sleepycat.je.rep.QuorumPolicy; 26 import com.sleepycat.je.rep.ReplicationNetworkConfig; 27 import com.sleepycat.je.rep.ReplicationConfig; 28 import com.sleepycat.je.rep.arbitration.Arbiter; 29 import com.sleepycat.je.rep.elections.Acceptor.SuggestionGenerator; 30 import com.sleepycat.je.rep.elections.Proposer.Proposal; 31 import com.sleepycat.je.rep.elections.Protocol.Value; 32 import com.sleepycat.je.rep.impl.RepGroupImpl; 33 import com.sleepycat.je.rep.impl.RepNodeImpl; 34 import com.sleepycat.je.rep.impl.node.ElectionQuorum; 35 import com.sleepycat.je.rep.impl.node.NameIdPair; 36 import com.sleepycat.je.rep.impl.node.RepNode; 37 import com.sleepycat.je.rep.net.DataChannelFactory; 38 import com.sleepycat.je.rep.utilint.RepTestUtils; 39 import com.sleepycat.je.rep.utilint.ServiceDispatcher; 40 import com.sleepycat.je.rep.utilint.net.DataChannelFactoryBuilder; 41 import com.sleepycat.util.test.TestBase; 42 43 /** 44 * Tests for elections as a whole. 45 */ 46 public class ElectionsTest extends TestBase { 47 48 /* Number of nodes in the test */ 49 private static final int nodes = 3; 50 private static final int monitors = 1; 51 private int nretries; 52 53 private final Object notificationsLock = new Object(); 54 private int listenerNotifications = 0; 55 56 private final ReplicationConfig repConfig[] = 57 new ReplicationConfig[nodes + 1]; 58 // private Monitor monitor; 59 private boolean monitorInvoked = false; 60 61 private final List<Elections> electionNodes = new LinkedList<Elections>(); 62 private MasterValue winningValue = null; 63 64 /* Latch to ensure that required all listeners have made it through. */ 65 CountDownLatch listenerLatch; 66 67 private RepGroupImpl repGroup = null; 68 69 @Override 70 @Before setUp()71 public void setUp() throws IOException { 72 repGroup = RepTestUtils.createTestRepGroup(nodes, monitors); 73 for (RepNodeImpl rn : repGroup.getAllElectableMembers()) { 74 ReplicationConfig config = new ReplicationConfig(); 75 config.setRepNetConfig( 76 ReplicationNetworkConfig.create( 77 RepTestUtils.readNetProps())); 78 repConfig[rn.getNodeId()] = config; 79 config.setNodeName(rn.getName()); 80 config.setNodeHostPort(rn.getHostName()+ ":" +rn.getPort()); 81 } 82 } 83 84 @Override 85 @After tearDown()86 public void tearDown() throws Exception { 87 if ((electionNodes != null) && (electionNodes.size() > 0)) { 88 electionNodes.get(0).shutdownAcceptorsLearners( 89 repGroup.getAllAcceptorSockets(), 90 repGroup.getAllHelperSockets()); 91 92 for (Elections node : electionNodes) { 93 node.getServiceDispatcher().shutdown(); 94 } 95 } 96 } 97 98 /** 99 * Simulates the start up of the first "n" nodes. If < n nodes are started, 100 * the others simulate being down. 101 * 102 * @param nstart nodes to start up 103 * @param groupSize the size of the group 104 * @throws IOException 105 */ startReplicationNodes(final int nstart, final int groupSize, final boolean testPriority)106 public void startReplicationNodes(final int nstart, 107 final int groupSize, 108 final boolean testPriority) 109 throws IOException { 110 111 for (short nodeNum = 1; nodeNum <= nstart; nodeNum++) { 112 Elections elections = 113 new Elections(newRepNode(groupSize, nodeNum, testPriority), 114 new TestListener(), 115 newSuggestionGenerator(nodeNum, testPriority)); 116 elections.getRepNode().getServiceDispatcher().start(); 117 elections.startLearner(); 118 elections.participate(); 119 electionNodes.add(elections); 120 elections.updateRepGroup(repGroup); 121 } 122 123 // Start up the Monitor as well. 124 /* 125 InetSocketAddress monitorSocket = 126 repGroup.getMonitors().iterator().next().getLearnerSocket(); 127 monitor = new Monitor(repConfig[1].getGroupName(), 128 monitorSocket, 129 repGroup); 130 monitor.setMonitorChangeListener(new MonitorChangeListener() { 131 @Override 132 public void replicationChange(MonitorChangeEvent monitorChangeEvent) { 133 monitorInvoked = true; 134 assertEquals(winningValue.getMasterNodeId(), 135 ((NewMasterEvent) monitorChangeEvent).getMasterId()); 136 } 137 }); 138 monitor.startMonitor(); 139 */ 140 } 141 startReplicationNodes(final int nstart, final int groupSize)142 public void startReplicationNodes(final int nstart, 143 final int groupSize) 144 throws IOException { 145 startReplicationNodes(nstart, groupSize, false); 146 } 147 newRepNode(final int groupSize, final short nodeNum, final boolean testPriority)148 private RepNode newRepNode(final int groupSize, 149 final short nodeNum, 150 final boolean testPriority) 151 throws IOException { 152 153 final DataChannelFactory channelFactory = 154 DataChannelFactoryBuilder.construct( 155 repConfig[nodeNum].getRepNetConfig(), 156 repConfig[nodeNum].getNodeName()); 157 final ServiceDispatcher serviceDispatcher = 158 new ServiceDispatcher(repConfig[nodeNum].getNodeSocketAddress(), 159 channelFactory); 160 161 return new RepNode(new NameIdPair(repConfig[nodeNum].getNodeName(), 162 nodeNum), 163 serviceDispatcher) { 164 @Override 165 public ElectionQuorum getElectionQuorum() { 166 return new ElectionQuorum() { 167 168 @Override 169 public boolean haveQuorum(QuorumPolicy quorumPolicy, 170 int votes) { 171 return votes >= quorumPolicy.quorumSize(groupSize); 172 } 173 }; 174 } 175 176 @Override 177 public int getElectionPriority() { 178 return testPriority ? (groupSize - nodeNum + 1) : 179 repConfig[nodeNum].getNodePriority(); 180 } 181 182 /** 183 * This faked out test node never really does arbitration, but 184 * needs to be able to return an Arbiter instance 185 * for election retries. 186 */ 187 @Override 188 public Arbiter getArbiter() { 189 return new Arbiter(null) { 190 @Override 191 public synchronized boolean activateArbitration() { 192 return false; 193 } 194 }; 195 } 196 }; 197 } 198 199 private SuggestionGenerator newSuggestionGenerator(final short nodeNum, 200 final boolean testPriority) { 201 return new Acceptor.SuggestionGenerator() { 202 @Override 203 @SuppressWarnings("unused") 204 public Value get(Proposal proposal) { 205 return new MasterValue("testhost", 9999, 206 new NameIdPair("n" + nodeNum, 207 nodeNum)); 208 } 209 210 @Override 211 @SuppressWarnings("unused") 212 public long getRanking(Proposal proposal) { 213 return testPriority ? 1000l : nodeNum * 10l; 214 } 215 }; 216 } 217 218 public void startReplicationNodes(int nstart) 219 throws IOException { 220 startReplicationNodes(nstart, nstart); 221 } 222 223 class TestListener implements Learner.Listener { 224 225 @Override 226 @SuppressWarnings("unused") 227 public void notify(Proposal proposal, Value value) { 228 synchronized (notificationsLock) { 229 listenerNotifications++; 230 } 231 assertEquals(winningValue, value); 232 listenerLatch.countDown(); 233 } 234 } 235 236 private Elections setupAndRunElection(QuorumPolicy qpolicy, 237 int activeNodes, 238 int groupSize) 239 throws IOException, InterruptedException { 240 241 /* Start all of them. */ 242 startReplicationNodes(activeNodes, groupSize); 243 winningValue = new MasterValue("testhost", 9999, 244 new NameIdPair("n" + (activeNodes), 245 (activeNodes))); 246 return runElection(qpolicy, activeNodes); 247 } 248 249 private Elections setupAndRunElection(int activeNodes) throws IOException, 250 InterruptedException { 251 return setupAndRunElection(QuorumPolicy.SIMPLE_MAJORITY, 252 activeNodes, 253 activeNodes); 254 } 255 256 private Elections setupAndRunElection(int activeNodes, int groupSize) 257 throws IOException, InterruptedException { 258 return setupAndRunElection(QuorumPolicy.SIMPLE_MAJORITY, 259 activeNodes, 260 groupSize); 261 } 262 263 private Elections runElection(QuorumPolicy qpolicy, int activeNodes) 264 throws InterruptedException { 265 listenerNotifications = 0; 266 monitorInvoked = false; 267 nretries = 2; 268 listenerLatch = new CountDownLatch(activeNodes); 269 /* Initiate an election on the first node. */ 270 Elections testElections = electionNodes.iterator().next(); 271 272 testElections.initiateElection(repGroup, qpolicy, nretries); 273 /* Ensure that Proposer has finished. */ 274 testElections.waitForElection(); 275 return testElections; 276 } 277 278 private Elections runElection(int activeNodes) 279 throws InterruptedException { 280 281 return runElection(QuorumPolicy.SIMPLE_MAJORITY, activeNodes); 282 } 283 284 /** 285 * Simulates presence of a simple majority, but with prio zero nodes 286 */ 287 @Test 288 public void testBasicZeroPrio() throws InterruptedException, 289 IOException { 290 291 /* Elections with a mix of zero and non-zero prio nodes. */ 292 final int majority = (nodes/2); 293 /* Have the first < majority nodes be zero prio. */ 294 295 for (int i=1; i < nodes; i++) { 296 repConfig[i].setNodePriority(0); 297 } 298 setupAndRunElection(nodes); 299 listenerLatch.await(); 300 assertEquals(nodes, listenerNotifications); 301 302 /* Now remove all non-zero prio nodes and try hold an election. */ 303 304 // Now remove one node and the elections should give up after 305 // retries have expired. 306 electionNodes.get(nodes-1).getAcceptor().shutdown(); 307 Elections testElections = runElection(nodes); 308 /* No successful elections, hence no notification. */ 309 assertEquals(0, listenerNotifications); 310 311 /* Ensure that all retries were due to lack of a Quorum. */ 312 assertEquals 313 (nretries, testElections.getStats().getInt(PHASE1_NO_NON_ZERO_PRIO)); 314 } 315 316 /** 317 * Tests a basic election with everything being normal. 318 */ 319 @Test 320 public void testBasicAllNodes() 321 throws InterruptedException, IOException { 322 323 /* Start all of them. */ 324 setupAndRunElection(nodes); 325 listenerLatch.await(); 326 327 assertEquals(nodes, listenerNotifications); 328 // assertTrue(monitorInvoked); 329 runElection(nodes); 330 listenerLatch.await(); 331 assertEquals(nodes, listenerNotifications); 332 assertFalse(monitorInvoked); 333 } 334 335 @Test 336 public void testBasicAllPrioNodes() 337 throws InterruptedException, IOException { 338 339 /* Start all of them. */ 340 startReplicationNodes(nodes, nodes, true); 341 winningValue = new MasterValue("testhost", 9999, 342 new NameIdPair("n1", 1)); 343 runElection(QuorumPolicy.SIMPLE_MAJORITY, nodes); 344 listenerLatch.await(); 345 346 assertEquals(nodes, listenerNotifications); 347 // assertTrue(monitorInvoked); 348 runElection(nodes); 349 listenerLatch.await(); 350 assertEquals(nodes, listenerNotifications); 351 assertFalse(monitorInvoked); 352 } 353 354 /** 355 * Simulates one node never having come up. 356 */ 357 @Test 358 public void testBasicAllButOneNode() throws InterruptedException, 359 IOException { 360 /* 361 * Simulate one node down at startup, but sufficient nodes for a quorum. 362 */ 363 setupAndRunElection(nodes - 1); 364 listenerLatch.await(); 365 assertEquals(nodes - 1, listenerNotifications); 366 // assertTrue(monitorInvoked); 367 } 368 369 /** 370 * Tests a basic election with one node having crashed. 371 */ 372 @Test 373 public void testBasicOneNodeCrash() throws InterruptedException, 374 IOException { 375 /* Start all of them. */ 376 Elections testElections = setupAndRunElection(nodes); 377 listenerLatch.await(); 378 379 assertEquals(nodes, listenerNotifications); 380 // assertTrue(monitorInvoked); 381 assertEquals(nodes, testElections.getStats().getInt(PROMISE_COUNT)); 382 electionNodes.get(0).getAcceptor().shutdown(); 383 testElections = runElection(nodes); 384 listenerLatch.await(); 385 /* The listener should have still obtained a notification. */ 386 assertEquals(nodes, listenerNotifications); 387 /* Master unchanged so monitor not invoked */ 388 assertFalse(monitorInvoked); 389 assertEquals(nodes - 1, testElections.getStats().getInt(PROMISE_COUNT)); 390 } 391 392 /** 393 * Tests a QuorumPolicy of ALL. 394 */ 395 @Test 396 public void testQuorumPolicyAll() throws InterruptedException, IOException { 397 398 /* Start all of them. */ 399 Elections testElections = 400 setupAndRunElection(QuorumPolicy.ALL, nodes, nodes); 401 listenerLatch.await(); 402 403 assertEquals(nodes, listenerNotifications); 404 // assertTrue(monitorInvoked); 405 assertEquals(nodes, testElections.getStats().getInt(PROMISE_COUNT)); 406 407 // Now remove one node and the elections should give up after 408 // retries have expired. 409 electionNodes.get(0).getAcceptor().shutdown(); 410 testElections = runElection(QuorumPolicy.ALL, nodes); 411 412 assertEquals(0, listenerNotifications); 413 assertFalse(monitorInvoked); 414 415 /* Ensure that all retries were due to lack of a Quorum. */ 416 assertEquals 417 (nretries, testElections.getStats().getInt(PHASE1_NO_QUORUM)); 418 } 419 420 /** 421 * Tests the case where a quorum could not be reached. 422 * 423 * @throws IOException 424 * @throws InterruptedException 425 */ 426 @Test 427 public void testNoQuorum() throws IOException, InterruptedException { 428 429 Elections testElections = setupAndRunElection(nodes/2, nodes); 430 /* 431 * No listeners were invoked so don't wait for a latch. No quorum, 432 * therefore no listener invocations. 433 */ 434 assertEquals(0, listenerNotifications); 435 assertFalse(monitorInvoked); 436 /* No listeners were invoked. */ 437 assertEquals(nodes / 2, listenerLatch.getCount()); 438 /* Ensure that all retries were due to lack of a Quorum. */ 439 assertEquals 440 (nretries, testElections.getStats().getInt(PHASE1_NO_QUORUM)); 441 } 442 } 443