1 /* Licensed to the Apache Software Foundation (ASF) under one 2 * or more contributor license agreements. See the NOTICE file 3 * distributed with this work for additional information 4 * regarding copyright ownership. The ASF licenses this file 5 * to you under the Apache License, Version 2.0 (the 6 * "License"); you may not use this file except in compliance 7 * with the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.zookeeper.test; 19 20 import static org.junit.jupiter.api.Assertions.assertEquals; 21 import static org.junit.jupiter.api.Assertions.assertTrue; 22 import java.io.ByteArrayInputStream; 23 import java.io.File; 24 import java.io.IOException; 25 import java.net.InetSocketAddress; 26 import java.util.HashMap; 27 import java.util.LinkedHashSet; 28 import java.util.Properties; 29 import java.util.Set; 30 import org.apache.zookeeper.PortAssignment; 31 import org.apache.zookeeper.TestableZooKeeper; 32 import org.apache.zookeeper.jmx.MBeanRegistry; 33 import org.apache.zookeeper.server.quorum.QuorumPeer; 34 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; 35 import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical; 36 import org.junit.jupiter.api.AfterEach; 37 import org.junit.jupiter.api.BeforeEach; 38 import org.junit.jupiter.api.Test; 39 import org.slf4j.Logger; 40 import org.slf4j.LoggerFactory; 41 public class HierarchicalQuorumTest extends ClientBase { 42 43 private static final Logger LOG = LoggerFactory.getLogger(QuorumBase.class); 44 45 File s1dir, s2dir, s3dir, s4dir, s5dir; 46 QuorumPeer s1, s2, s3, s4, s5; 47 protected int port1; 48 protected int port2; 49 protected int port3; 50 protected int port4; 51 protected int port5; 52 53 protected int leport1; 54 protected int leport2; 55 protected int leport3; 56 protected int leport4; 57 protected int leport5; 58 59 protected int clientport1; 60 protected int clientport2; 61 protected int clientport3; 62 protected int clientport4; 63 protected int clientport5; 64 65 Properties qp; 66 protected final ClientHammerTest cht = new ClientHammerTest(); 67 68 @BeforeEach 69 @Override setUp()70 public void setUp() throws Exception { 71 setupTestEnv(); 72 73 JMXEnv.setUp(); 74 75 setUpAll(); 76 77 port1 = PortAssignment.unique(); 78 port2 = PortAssignment.unique(); 79 port3 = PortAssignment.unique(); 80 port4 = PortAssignment.unique(); 81 port5 = PortAssignment.unique(); 82 leport1 = PortAssignment.unique(); 83 leport2 = PortAssignment.unique(); 84 leport3 = PortAssignment.unique(); 85 leport4 = PortAssignment.unique(); 86 leport5 = PortAssignment.unique(); 87 clientport1 = PortAssignment.unique(); 88 clientport2 = PortAssignment.unique(); 89 clientport3 = PortAssignment.unique(); 90 clientport4 = PortAssignment.unique(); 91 clientport5 = PortAssignment.unique(); 92 93 hostPort = "127.0.0.1:" + clientport1 94 + ",127.0.0.1:" + clientport2 95 + ",127.0.0.1:" + clientport3 96 + ",127.0.0.1:" + clientport4 97 + ",127.0.0.1:" + clientport5; 98 LOG.info("Ports are: {}", hostPort); 99 100 s1dir = ClientBase.createTmpDir(); 101 s2dir = ClientBase.createTmpDir(); 102 s3dir = ClientBase.createTmpDir(); 103 s4dir = ClientBase.createTmpDir(); 104 s5dir = ClientBase.createTmpDir(); 105 106 String config = "group.1=1:2:3\n" 107 + "group.2=4:5\n" 108 + "weight.1=1\n" 109 + "weight.2=1\n" 110 + "weight.3=1\n" 111 + "weight.4=0\n" 112 + "weight.5=0\n" 113 + "server.1=127.0.0.1:" + port1 + ":" + leport1 + ";" + clientport1 114 + "\n" + "server.2=127.0.0.1:" + port2 + ":" + leport2 + ";" + clientport2 115 + "\n" + "server.3=127.0.0.1:" + port3 + ":" + leport3 + ";" + clientport3 116 + "\n" + "server.4=127.0.0.1:" + port4 + ":" + leport4 + ";" + clientport4 117 + "\n" + "server.5=127.0.0.1:" + port5 + ":" + leport5 + ";" + clientport5 118 + "\n"; 119 120 ByteArrayInputStream is = new ByteArrayInputStream(config.getBytes()); 121 this.qp = new Properties(); 122 123 qp.load(is); 124 startServers(); 125 126 cht.hostPort = hostPort; 127 cht.setUpAll(); 128 129 LOG.info("Setup finished"); 130 } 131 132 /** 133 * This method is here to keep backwards compatibility with the test code 134 * written before observers. 135 * @throws Exception 136 */ startServers()137 void startServers() throws Exception { 138 startServers(false); 139 } 140 141 /** 142 * Starts 5 Learners. When withObservers == false, all 5 are Followers. 143 * When withObservers == true, 3 are Followers and 2 Observers. 144 * @param withObservers 145 * @throws Exception 146 */ startServers(boolean withObservers)147 void startServers(boolean withObservers) throws Exception { 148 int tickTime = 2000; 149 int initLimit = 3; 150 int syncLimit = 3; 151 int connectToLearnerMasterLimit = 3; 152 HashMap<Long, QuorumServer> peers = new HashMap<>(); 153 154 peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", port1), new InetSocketAddress("127.0.0.1", leport1), new InetSocketAddress("127.0.0.1", clientport1))); 155 peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", port2), new InetSocketAddress("127.0.0.1", leport2), new InetSocketAddress("127.0.0.1", clientport2))); 156 peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", port3), new InetSocketAddress("127.0.0.1", leport3), new InetSocketAddress("127.0.0.1", clientport3))); 157 peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1", port4), new InetSocketAddress("127.0.0.1", leport4), new InetSocketAddress("127.0.0.1", clientport4), withObservers ? QuorumPeer.LearnerType.OBSERVER : QuorumPeer.LearnerType.PARTICIPANT)); 158 peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1", port5), new InetSocketAddress("127.0.0.1", leport5), new InetSocketAddress("127.0.0.1", clientport5), withObservers ? QuorumPeer.LearnerType.OBSERVER : QuorumPeer.LearnerType.PARTICIPANT)); 159 160 LOG.info("creating QuorumPeer 1 port {}", clientport1); 161 162 if (withObservers) { 163 qp.setProperty("server.4", "127.0.0.1:" + port4 + ":" + leport4 + ":observer" + ";" + clientport4); 164 qp.setProperty("server.5", "127.0.0.1:" + port5 + ":" + leport5 + ":observer" + ";" + clientport5); 165 } 166 QuorumHierarchical hq1 = new QuorumHierarchical(qp); 167 s1 = new QuorumPeer(peers, s1dir, s1dir, clientport1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq1); 168 assertEquals(clientport1, s1.getClientPort()); 169 170 LOG.info("creating QuorumPeer 2 port {}", clientport2); 171 QuorumHierarchical hq2 = new QuorumHierarchical(qp); 172 s2 = new QuorumPeer(peers, s2dir, s2dir, clientport2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq2); 173 assertEquals(clientport2, s2.getClientPort()); 174 175 LOG.info("creating QuorumPeer 3 port {}", clientport3); 176 QuorumHierarchical hq3 = new QuorumHierarchical(qp); 177 s3 = new QuorumPeer(peers, s3dir, s3dir, clientport3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq3); 178 assertEquals(clientport3, s3.getClientPort()); 179 180 LOG.info("creating QuorumPeer 4 port {}", clientport4); 181 QuorumHierarchical hq4 = new QuorumHierarchical(qp); 182 s4 = new QuorumPeer(peers, s4dir, s4dir, clientport4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq4); 183 if (withObservers) { 184 s4.setLearnerType(QuorumPeer.LearnerType.OBSERVER); 185 } 186 assertEquals(clientport4, s4.getClientPort()); 187 188 LOG.info("creating QuorumPeer 5 port {}", clientport5); 189 QuorumHierarchical hq5 = new QuorumHierarchical(qp); 190 s5 = new QuorumPeer(peers, s5dir, s5dir, clientport5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq5); 191 if (withObservers) { 192 s5.setLearnerType(QuorumPeer.LearnerType.OBSERVER); 193 } 194 assertEquals(clientport5, s5.getClientPort()); 195 196 LOG.info("start QuorumPeer 1"); 197 s1.start(); 198 LOG.info("start QuorumPeer 2"); 199 s2.start(); 200 LOG.info("start QuorumPeer 3"); 201 s3.start(); 202 LOG.info("start QuorumPeer 4{}", (withObservers ? "(observer)" : "")); 203 s4.start(); 204 LOG.info("start QuorumPeer 5{}", (withObservers ? "(observer)" : "")); 205 s5.start(); 206 LOG.info("started QuorumPeer 5"); 207 208 LOG.info("Closing ports {}", hostPort); 209 for (String hp : hostPort.split(",")) { 210 assertTrue(ClientBase.waitForServerUp(hp, CONNECTION_TIMEOUT), "waiting for server up"); 211 LOG.info("{} is accepting client connections", hp); 212 } 213 final int numberOfPeers = 5; 214 // interesting to see what's there... 215 JMXEnv.dump(); 216 // make sure we have these 5 servers listed 217 Set<String> ensureNames = new LinkedHashSet<String>(); 218 for (int i = 1; i <= numberOfPeers; i++) { 219 ensureNames.add("InMemoryDataTree"); 220 } 221 for (int i = 1; i <= numberOfPeers; i++) { 222 ensureNames.add("name0=ReplicatedServer_id" + i + ",name1=replica." + i + ",name2="); 223 } 224 for (int i = 1; i <= numberOfPeers; i++) { 225 for (int j = 1; j <= numberOfPeers; j++) { 226 ensureNames.add("name0=ReplicatedServer_id" + i + ",name1=replica." + j); 227 } 228 } 229 for (int i = 1; i <= numberOfPeers; i++) { 230 ensureNames.add("name0=ReplicatedServer_id" + i); 231 } 232 JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()])); 233 for (int i = 1; i <= numberOfPeers; i++) { 234 // LocalPeerBean 235 String bean = MBeanRegistry.DOMAIN + ":name0=ReplicatedServer_id" + i + ",name1=replica." + i; 236 JMXEnv.ensureBeanAttribute(bean, "ConfigVersion"); 237 JMXEnv.ensureBeanAttribute(bean, "LearnerType"); 238 JMXEnv.ensureBeanAttribute(bean, "ClientAddress"); 239 JMXEnv.ensureBeanAttribute(bean, "ElectionAddress"); 240 JMXEnv.ensureBeanAttribute(bean, "QuorumSystemInfo"); 241 JMXEnv.ensureBeanAttribute(bean, "Leader"); 242 } 243 244 for (int i = 1; i <= numberOfPeers; i++) { 245 for (int j = 1; j <= numberOfPeers; j++) { 246 if (j != i) { 247 // RemotePeerBean 248 String bean = MBeanRegistry.DOMAIN + ":name0=ReplicatedServer_id" + i + ",name1=replica." + j; 249 JMXEnv.ensureBeanAttribute(bean, "Name"); 250 JMXEnv.ensureBeanAttribute(bean, "LearnerType"); 251 JMXEnv.ensureBeanAttribute(bean, "ClientAddress"); 252 JMXEnv.ensureBeanAttribute(bean, "ElectionAddress"); 253 JMXEnv.ensureBeanAttribute(bean, "QuorumAddress"); 254 JMXEnv.ensureBeanAttribute(bean, "Leader"); 255 } 256 } 257 } 258 } 259 260 @AfterEach 261 @Override tearDown()262 public void tearDown() throws Exception { 263 LOG.info("TearDown started"); 264 cht.tearDownAll(); 265 266 LOG.info("Shutting down server 1"); 267 shutdown(s1); 268 LOG.info("Shutting down server 2"); 269 shutdown(s2); 270 LOG.info("Shutting down server 3"); 271 shutdown(s3); 272 LOG.info("Shutting down server 4"); 273 shutdown(s4); 274 LOG.info("Shutting down server 5"); 275 shutdown(s5); 276 277 for (String hp : hostPort.split(",")) { 278 assertTrue(ClientBase.waitForServerDown(hp, ClientBase.CONNECTION_TIMEOUT), "waiting for server down"); 279 LOG.info("{} is no longer accepting client connections", hp); 280 } 281 282 JMXEnv.tearDown(); 283 } 284 shutdown(QuorumPeer qp)285 protected void shutdown(QuorumPeer qp) { 286 QuorumBase.shutdown(qp); 287 } 288 createClient()289 protected TestableZooKeeper createClient() throws IOException, InterruptedException { 290 return createClient(hostPort); 291 } 292 createClient(String hp)293 protected TestableZooKeeper createClient(String hp) throws IOException, InterruptedException { 294 CountdownWatcher watcher = new CountdownWatcher(); 295 return createClient(watcher, hp); 296 } 297 298 @Test testHierarchicalQuorum()299 public void testHierarchicalQuorum() throws Throwable { 300 cht.runHammer(5, 10); 301 } 302 303 } 304