1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hbase.zookeeper;
20 
21 import static org.junit.Assert.*;
22 
23 import java.util.concurrent.atomic.AtomicBoolean;
24 
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.*;
29 import org.apache.hadoop.hbase.testclassification.MediumTests;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.junit.AfterClass;
32 import org.junit.BeforeClass;
33 import org.junit.Test;
34 import org.junit.experimental.categories.Category;
35 
36 /**
37  */
38 @Category(MediumTests.class)
39 public class TestZKLeaderManager {
40   private static final Log LOG = LogFactory.getLog(TestZKLeaderManager.class);
41 
42   private static final String LEADER_ZNODE =
43       "/test/" + TestZKLeaderManager.class.getSimpleName();
44 
45   private static class MockAbortable implements Abortable {
46     private boolean aborted;
47 
48     @Override
abort(String why, Throwable e)49     public void abort(String why, Throwable e) {
50       aborted = true;
51       LOG.fatal("Aborting during test: "+why, e);
52       fail("Aborted during test: " + why);
53     }
54 
55     @Override
isAborted()56     public boolean isAborted() {
57       return aborted;
58     }
59   }
60 
61   private static class MockLeader extends Thread implements Stoppable {
62     private boolean stopped;
63     private ZooKeeperWatcher watcher;
64     private ZKLeaderManager zkLeader;
65     private AtomicBoolean master = new AtomicBoolean(false);
66     private int index;
67 
MockLeader(ZooKeeperWatcher watcher, int index)68     public MockLeader(ZooKeeperWatcher watcher, int index) {
69       setDaemon(true);
70       setName("TestZKLeaderManager-leader-" + index);
71       this.index = index;
72       this.watcher = watcher;
73       this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE,
74           Bytes.toBytes(index), this);
75     }
76 
isMaster()77     public boolean isMaster() {
78       return master.get();
79     }
80 
getIndex()81     public int getIndex() {
82       return index;
83     }
84 
getWatcher()85     public ZooKeeperWatcher getWatcher() {
86       return watcher;
87     }
88 
run()89     public void run() {
90       while (!stopped) {
91         zkLeader.start();
92         zkLeader.waitToBecomeLeader();
93         master.set(true);
94 
95         while (master.get() && !stopped) {
96           try {
97             Thread.sleep(10);
98           } catch (InterruptedException ignored) {}
99         }
100       }
101     }
102 
abdicate()103     public void abdicate() {
104       zkLeader.stepDownAsLeader();
105       master.set(false);
106     }
107 
108     @Override
stop(String why)109     public void stop(String why) {
110       stopped = true;
111       abdicate();
112       watcher.close();
113     }
114 
115     @Override
isStopped()116     public boolean isStopped() {
117       return stopped;
118     }
119   }
120 
121   private static HBaseTestingUtility TEST_UTIL;
122   private static MockLeader[] CANDIDATES;
123 
124   @BeforeClass
setupBeforeClass()125   public static void setupBeforeClass() throws Exception {
126     TEST_UTIL = new HBaseTestingUtility();
127     TEST_UTIL.startMiniZKCluster();
128     Configuration conf = TEST_UTIL.getConfiguration();
129 
130     // use an abortable to fail the test in the case of any KeeperExceptions
131     MockAbortable abortable = new MockAbortable();
132     CANDIDATES = new MockLeader[3];
133     for (int i = 0; i < 3; i++) {
134       ZooKeeperWatcher watcher = newZK(conf, "server"+i, abortable);
135       CANDIDATES[i] = new MockLeader(watcher, i);
136       CANDIDATES[i].start();
137     }
138   }
139 
140   @AfterClass
tearDownAfterClass()141   public static void tearDownAfterClass() throws Exception {
142     TEST_UTIL.shutdownMiniZKCluster();
143   }
144 
145   @Test
testLeaderSelection()146   public void testLeaderSelection() throws Exception {
147     MockLeader currentLeader = getCurrentLeader();
148     // one leader should have been found
149     assertNotNull("Leader should exist", currentLeader);
150     LOG.debug("Current leader index is "+currentLeader.getIndex());
151 
152     byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
153     assertNotNull("Leader znode should contain leader index", znodeData);
154     assertTrue("Leader znode should not be empty", znodeData.length > 0);
155     int storedIndex = Bytes.toInt(znodeData);
156     LOG.debug("Stored leader index in ZK is "+storedIndex);
157     assertEquals("Leader znode should match leader index",
158         currentLeader.getIndex(), storedIndex);
159 
160     // force a leader transition
161     currentLeader.abdicate();
162     assertFalse(currentLeader.isMaster());
163 
164     // check for new leader
165     currentLeader = getCurrentLeader();
166     // one leader should have been found
167     assertNotNull("New leader should exist after abdication", currentLeader);
168     LOG.debug("New leader index is "+currentLeader.getIndex());
169 
170     znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
171     assertNotNull("Leader znode should contain leader index", znodeData);
172     assertTrue("Leader znode should not be empty", znodeData.length > 0);
173     storedIndex = Bytes.toInt(znodeData);
174     LOG.debug("Stored leader index in ZK is "+storedIndex);
175     assertEquals("Leader znode should match leader index",
176         currentLeader.getIndex(), storedIndex);
177 
178     // force another transition by stopping the current
179     currentLeader.stop("Stopping for test");
180     assertFalse(currentLeader.isMaster());
181 
182     // check for new leader
183     currentLeader = getCurrentLeader();
184     // one leader should have been found
185     assertNotNull("New leader should exist after stop", currentLeader);
186     LOG.debug("New leader index is "+currentLeader.getIndex());
187 
188     znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
189     assertNotNull("Leader znode should contain leader index", znodeData);
190     assertTrue("Leader znode should not be empty", znodeData.length > 0);
191     storedIndex = Bytes.toInt(znodeData);
192     LOG.debug("Stored leader index in ZK is "+storedIndex);
193     assertEquals("Leader znode should match leader index",
194         currentLeader.getIndex(), storedIndex);
195 
196     // with a second stop we can guarantee that a previous leader has resumed leading
197     currentLeader.stop("Stopping for test");
198     assertFalse(currentLeader.isMaster());
199 
200     // check for new
201     currentLeader = getCurrentLeader();
202     assertNotNull("New leader should exist", currentLeader);
203   }
204 
getCurrentLeader()205   private MockLeader getCurrentLeader() throws Exception {
206     MockLeader currentLeader = null;
207     outer:
208     // Wait up to 10 secs for initial leader
209     for (int i = 0; i < 1000; i++) {
210       for (int j = 0; j < CANDIDATES.length; j++) {
211         if (CANDIDATES[j].isMaster()) {
212           // should only be one leader
213           if (currentLeader != null) {
214             fail("Both candidate "+currentLeader.getIndex()+" and "+j+" claim to be leader!");
215           }
216           currentLeader = CANDIDATES[j];
217         }
218       }
219       if (currentLeader != null) {
220         break outer;
221       }
222       Thread.sleep(10);
223     }
224     return currentLeader;
225   }
226 
newZK(Configuration conf, String name, Abortable abort)227   private static ZooKeeperWatcher newZK(Configuration conf, String name,
228       Abortable abort) throws Exception {
229     Configuration copy = HBaseConfiguration.create(conf);
230     ZooKeeperWatcher zk = new ZooKeeperWatcher(copy, name, abort);
231     return zk;
232   }
233 
234 }
235 
236