1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hbase.zookeeper; 20 21 import static org.junit.Assert.*; 22 23 import java.util.concurrent.atomic.AtomicBoolean; 24 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 import org.apache.hadoop.conf.Configuration; 28 import org.apache.hadoop.hbase.*; 29 import org.apache.hadoop.hbase.testclassification.MediumTests; 30 import org.apache.hadoop.hbase.util.Bytes; 31 import org.junit.AfterClass; 32 import org.junit.BeforeClass; 33 import org.junit.Test; 34 import org.junit.experimental.categories.Category; 35 36 /** 37 */ 38 @Category(MediumTests.class) 39 public class TestZKLeaderManager { 40 private static final Log LOG = LogFactory.getLog(TestZKLeaderManager.class); 41 42 private static final String LEADER_ZNODE = 43 "/test/" + TestZKLeaderManager.class.getSimpleName(); 44 45 private static class MockAbortable implements Abortable { 46 private boolean aborted; 47 48 @Override abort(String why, Throwable e)49 public void abort(String why, Throwable e) { 50 aborted = true; 51 LOG.fatal("Aborting during test: "+why, e); 52 fail("Aborted during test: " + why); 53 } 54 55 @Override isAborted()56 public boolean isAborted() { 57 return aborted; 58 } 59 } 60 61 private static class MockLeader extends Thread implements Stoppable { 62 private boolean stopped; 63 private ZooKeeperWatcher watcher; 64 private ZKLeaderManager zkLeader; 65 private AtomicBoolean master = new AtomicBoolean(false); 66 private int index; 67 MockLeader(ZooKeeperWatcher watcher, int index)68 public MockLeader(ZooKeeperWatcher watcher, int index) { 69 setDaemon(true); 70 setName("TestZKLeaderManager-leader-" + index); 71 this.index = index; 72 this.watcher = watcher; 73 this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE, 74 Bytes.toBytes(index), this); 75 } 76 isMaster()77 public boolean isMaster() { 78 return master.get(); 79 } 80 getIndex()81 public int getIndex() { 82 return index; 83 } 84 getWatcher()85 public ZooKeeperWatcher getWatcher() { 86 return watcher; 87 } 88 run()89 public void run() { 90 while (!stopped) { 91 zkLeader.start(); 92 zkLeader.waitToBecomeLeader(); 93 master.set(true); 94 95 while (master.get() && !stopped) { 96 try { 97 Thread.sleep(10); 98 } catch (InterruptedException ignored) {} 99 } 100 } 101 } 102 abdicate()103 public void abdicate() { 104 zkLeader.stepDownAsLeader(); 105 master.set(false); 106 } 107 108 @Override stop(String why)109 public void stop(String why) { 110 stopped = true; 111 abdicate(); 112 watcher.close(); 113 } 114 115 @Override isStopped()116 public boolean isStopped() { 117 return stopped; 118 } 119 } 120 121 private static HBaseTestingUtility TEST_UTIL; 122 private static MockLeader[] CANDIDATES; 123 124 @BeforeClass setupBeforeClass()125 public static void setupBeforeClass() throws Exception { 126 TEST_UTIL = new HBaseTestingUtility(); 127 TEST_UTIL.startMiniZKCluster(); 128 Configuration conf = TEST_UTIL.getConfiguration(); 129 130 // use an abortable to fail the test in the case of any KeeperExceptions 131 MockAbortable abortable = new MockAbortable(); 132 CANDIDATES = new MockLeader[3]; 133 for (int i = 0; i < 3; i++) { 134 ZooKeeperWatcher watcher = newZK(conf, "server"+i, abortable); 135 CANDIDATES[i] = new MockLeader(watcher, i); 136 CANDIDATES[i].start(); 137 } 138 } 139 140 @AfterClass tearDownAfterClass()141 public static void tearDownAfterClass() throws Exception { 142 TEST_UTIL.shutdownMiniZKCluster(); 143 } 144 145 @Test testLeaderSelection()146 public void testLeaderSelection() throws Exception { 147 MockLeader currentLeader = getCurrentLeader(); 148 // one leader should have been found 149 assertNotNull("Leader should exist", currentLeader); 150 LOG.debug("Current leader index is "+currentLeader.getIndex()); 151 152 byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 153 assertNotNull("Leader znode should contain leader index", znodeData); 154 assertTrue("Leader znode should not be empty", znodeData.length > 0); 155 int storedIndex = Bytes.toInt(znodeData); 156 LOG.debug("Stored leader index in ZK is "+storedIndex); 157 assertEquals("Leader znode should match leader index", 158 currentLeader.getIndex(), storedIndex); 159 160 // force a leader transition 161 currentLeader.abdicate(); 162 assertFalse(currentLeader.isMaster()); 163 164 // check for new leader 165 currentLeader = getCurrentLeader(); 166 // one leader should have been found 167 assertNotNull("New leader should exist after abdication", currentLeader); 168 LOG.debug("New leader index is "+currentLeader.getIndex()); 169 170 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 171 assertNotNull("Leader znode should contain leader index", znodeData); 172 assertTrue("Leader znode should not be empty", znodeData.length > 0); 173 storedIndex = Bytes.toInt(znodeData); 174 LOG.debug("Stored leader index in ZK is "+storedIndex); 175 assertEquals("Leader znode should match leader index", 176 currentLeader.getIndex(), storedIndex); 177 178 // force another transition by stopping the current 179 currentLeader.stop("Stopping for test"); 180 assertFalse(currentLeader.isMaster()); 181 182 // check for new leader 183 currentLeader = getCurrentLeader(); 184 // one leader should have been found 185 assertNotNull("New leader should exist after stop", currentLeader); 186 LOG.debug("New leader index is "+currentLeader.getIndex()); 187 188 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 189 assertNotNull("Leader znode should contain leader index", znodeData); 190 assertTrue("Leader znode should not be empty", znodeData.length > 0); 191 storedIndex = Bytes.toInt(znodeData); 192 LOG.debug("Stored leader index in ZK is "+storedIndex); 193 assertEquals("Leader znode should match leader index", 194 currentLeader.getIndex(), storedIndex); 195 196 // with a second stop we can guarantee that a previous leader has resumed leading 197 currentLeader.stop("Stopping for test"); 198 assertFalse(currentLeader.isMaster()); 199 200 // check for new 201 currentLeader = getCurrentLeader(); 202 assertNotNull("New leader should exist", currentLeader); 203 } 204 getCurrentLeader()205 private MockLeader getCurrentLeader() throws Exception { 206 MockLeader currentLeader = null; 207 outer: 208 // Wait up to 10 secs for initial leader 209 for (int i = 0; i < 1000; i++) { 210 for (int j = 0; j < CANDIDATES.length; j++) { 211 if (CANDIDATES[j].isMaster()) { 212 // should only be one leader 213 if (currentLeader != null) { 214 fail("Both candidate "+currentLeader.getIndex()+" and "+j+" claim to be leader!"); 215 } 216 currentLeader = CANDIDATES[j]; 217 } 218 } 219 if (currentLeader != null) { 220 break outer; 221 } 222 Thread.sleep(10); 223 } 224 return currentLeader; 225 } 226 newZK(Configuration conf, String name, Abortable abort)227 private static ZooKeeperWatcher newZK(Configuration conf, String name, 228 Abortable abort) throws Exception { 229 Configuration copy = HBaseConfiguration.create(conf); 230 ZooKeeperWatcher zk = new ZooKeeperWatcher(copy, name, abort); 231 return zk; 232 } 233 234 } 235 236