1 /* Licensed to the Apache Software Foundation (ASF) under one
2  * or more contributor license agreements.  See the NOTICE file
3  * distributed with this work for additional information
4  * regarding copyright ownership.  The ASF licenses this file
5  * to you under the Apache License, Version 2.0 (the
6  * "License"); you may not use this file except in compliance
7  * with the License.  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.zookeeper.test;
19 
20 import static org.junit.jupiter.api.Assertions.assertEquals;
21 import static org.junit.jupiter.api.Assertions.assertTrue;
22 import java.io.ByteArrayInputStream;
23 import java.io.File;
24 import java.io.IOException;
25 import java.net.InetSocketAddress;
26 import java.util.HashMap;
27 import java.util.LinkedHashSet;
28 import java.util.Properties;
29 import java.util.Set;
30 import org.apache.zookeeper.PortAssignment;
31 import org.apache.zookeeper.TestableZooKeeper;
32 import org.apache.zookeeper.jmx.MBeanRegistry;
33 import org.apache.zookeeper.server.quorum.QuorumPeer;
34 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
35 import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
36 import org.junit.jupiter.api.AfterEach;
37 import org.junit.jupiter.api.BeforeEach;
38 import org.junit.jupiter.api.Test;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 public class HierarchicalQuorumTest extends ClientBase {
42 
43     private static final Logger LOG = LoggerFactory.getLogger(QuorumBase.class);
44 
45     File s1dir, s2dir, s3dir, s4dir, s5dir;
46     QuorumPeer s1, s2, s3, s4, s5;
47     protected int port1;
48     protected int port2;
49     protected int port3;
50     protected int port4;
51     protected int port5;
52 
53     protected int leport1;
54     protected int leport2;
55     protected int leport3;
56     protected int leport4;
57     protected int leport5;
58 
59     protected int clientport1;
60     protected int clientport2;
61     protected int clientport3;
62     protected int clientport4;
63     protected int clientport5;
64 
65     Properties qp;
66     protected final ClientHammerTest cht = new ClientHammerTest();
67 
68     @BeforeEach
69     @Override
setUp()70     public void setUp() throws Exception {
71         setupTestEnv();
72 
73         JMXEnv.setUp();
74 
75         setUpAll();
76 
77         port1 = PortAssignment.unique();
78         port2 = PortAssignment.unique();
79         port3 = PortAssignment.unique();
80         port4 = PortAssignment.unique();
81         port5 = PortAssignment.unique();
82         leport1 = PortAssignment.unique();
83         leport2 = PortAssignment.unique();
84         leport3 = PortAssignment.unique();
85         leport4 = PortAssignment.unique();
86         leport5 = PortAssignment.unique();
87         clientport1 = PortAssignment.unique();
88         clientport2 = PortAssignment.unique();
89         clientport3 = PortAssignment.unique();
90         clientport4 = PortAssignment.unique();
91         clientport5 = PortAssignment.unique();
92 
93         hostPort = "127.0.0.1:" + clientport1
94                    + ",127.0.0.1:" + clientport2
95                    + ",127.0.0.1:" + clientport3
96                    + ",127.0.0.1:" + clientport4
97                    + ",127.0.0.1:" + clientport5;
98         LOG.info("Ports are: {}", hostPort);
99 
100         s1dir = ClientBase.createTmpDir();
101         s2dir = ClientBase.createTmpDir();
102         s3dir = ClientBase.createTmpDir();
103         s4dir = ClientBase.createTmpDir();
104         s5dir = ClientBase.createTmpDir();
105 
106         String config = "group.1=1:2:3\n"
107                         + "group.2=4:5\n"
108                         + "weight.1=1\n"
109                         + "weight.2=1\n"
110                         + "weight.3=1\n"
111                         + "weight.4=0\n"
112                         + "weight.5=0\n"
113                         + "server.1=127.0.0.1:" + port1 + ":" + leport1 + ";" + clientport1
114                         + "\n" + "server.2=127.0.0.1:" + port2 + ":" + leport2 + ";" + clientport2
115                         + "\n" + "server.3=127.0.0.1:" + port3 + ":" + leport3 + ";" + clientport3
116                         + "\n" + "server.4=127.0.0.1:" + port4 + ":" + leport4 + ";" + clientport4
117                         + "\n" + "server.5=127.0.0.1:" + port5 + ":" + leport5 + ";" + clientport5
118                         + "\n";
119 
120         ByteArrayInputStream is = new ByteArrayInputStream(config.getBytes());
121         this.qp = new Properties();
122 
123         qp.load(is);
124         startServers();
125 
126         cht.hostPort = hostPort;
127         cht.setUpAll();
128 
129         LOG.info("Setup finished");
130     }
131 
132     /**
133      * This method is here to keep backwards compatibility with the test code
134      * written before observers.
135      * @throws Exception
136      */
startServers()137     void startServers() throws Exception {
138         startServers(false);
139     }
140 
141     /**
142      * Starts 5 Learners. When withObservers == false, all 5 are Followers.
143      * When withObservers == true, 3 are Followers and 2 Observers.
144      * @param withObservers
145      * @throws Exception
146      */
startServers(boolean withObservers)147     void startServers(boolean withObservers) throws Exception {
148         int tickTime = 2000;
149         int initLimit = 3;
150         int syncLimit = 3;
151         int connectToLearnerMasterLimit = 3;
152         HashMap<Long, QuorumServer> peers = new HashMap<>();
153 
154         peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", port1), new InetSocketAddress("127.0.0.1", leport1), new InetSocketAddress("127.0.0.1", clientport1)));
155         peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", port2), new InetSocketAddress("127.0.0.1", leport2), new InetSocketAddress("127.0.0.1", clientport2)));
156         peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", port3), new InetSocketAddress("127.0.0.1", leport3), new InetSocketAddress("127.0.0.1", clientport3)));
157         peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1", port4), new InetSocketAddress("127.0.0.1", leport4), new InetSocketAddress("127.0.0.1", clientport4), withObservers ? QuorumPeer.LearnerType.OBSERVER : QuorumPeer.LearnerType.PARTICIPANT));
158         peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1", port5), new InetSocketAddress("127.0.0.1", leport5), new InetSocketAddress("127.0.0.1", clientport5), withObservers ? QuorumPeer.LearnerType.OBSERVER : QuorumPeer.LearnerType.PARTICIPANT));
159 
160         LOG.info("creating QuorumPeer 1 port {}", clientport1);
161 
162         if (withObservers) {
163             qp.setProperty("server.4", "127.0.0.1:" + port4 + ":" + leport4 + ":observer" + ";" + clientport4);
164             qp.setProperty("server.5", "127.0.0.1:" + port5 + ":" + leport5 + ":observer" + ";" + clientport5);
165         }
166         QuorumHierarchical hq1 = new QuorumHierarchical(qp);
167         s1 = new QuorumPeer(peers, s1dir, s1dir, clientport1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq1);
168         assertEquals(clientport1, s1.getClientPort());
169 
170         LOG.info("creating QuorumPeer 2 port {}", clientport2);
171         QuorumHierarchical hq2 = new QuorumHierarchical(qp);
172         s2 = new QuorumPeer(peers, s2dir, s2dir, clientport2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq2);
173         assertEquals(clientport2, s2.getClientPort());
174 
175         LOG.info("creating QuorumPeer 3 port {}", clientport3);
176         QuorumHierarchical hq3 = new QuorumHierarchical(qp);
177         s3 = new QuorumPeer(peers, s3dir, s3dir, clientport3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq3);
178         assertEquals(clientport3, s3.getClientPort());
179 
180         LOG.info("creating QuorumPeer 4 port {}", clientport4);
181         QuorumHierarchical hq4 = new QuorumHierarchical(qp);
182         s4 = new QuorumPeer(peers, s4dir, s4dir, clientport4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq4);
183         if (withObservers) {
184             s4.setLearnerType(QuorumPeer.LearnerType.OBSERVER);
185         }
186         assertEquals(clientport4, s4.getClientPort());
187 
188         LOG.info("creating QuorumPeer 5 port {}", clientport5);
189         QuorumHierarchical hq5 = new QuorumHierarchical(qp);
190         s5 = new QuorumPeer(peers, s5dir, s5dir, clientport5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq5);
191         if (withObservers) {
192             s5.setLearnerType(QuorumPeer.LearnerType.OBSERVER);
193         }
194         assertEquals(clientport5, s5.getClientPort());
195 
196         LOG.info("start QuorumPeer 1");
197         s1.start();
198         LOG.info("start QuorumPeer 2");
199         s2.start();
200         LOG.info("start QuorumPeer 3");
201         s3.start();
202         LOG.info("start QuorumPeer 4{}", (withObservers ? "(observer)" : ""));
203         s4.start();
204         LOG.info("start QuorumPeer 5{}", (withObservers ? "(observer)" : ""));
205         s5.start();
206         LOG.info("started QuorumPeer 5");
207 
208         LOG.info("Closing ports {}", hostPort);
209         for (String hp : hostPort.split(",")) {
210             assertTrue(ClientBase.waitForServerUp(hp, CONNECTION_TIMEOUT), "waiting for server up");
211             LOG.info("{} is accepting client connections", hp);
212         }
213         final int numberOfPeers = 5;
214         // interesting to see what's there...
215         JMXEnv.dump();
216         // make sure we have these 5 servers listed
217         Set<String> ensureNames = new LinkedHashSet<String>();
218         for (int i = 1; i <= numberOfPeers; i++) {
219             ensureNames.add("InMemoryDataTree");
220         }
221         for (int i = 1; i <= numberOfPeers; i++) {
222             ensureNames.add("name0=ReplicatedServer_id" + i + ",name1=replica." + i + ",name2=");
223         }
224         for (int i = 1; i <= numberOfPeers; i++) {
225             for (int j = 1; j <= numberOfPeers; j++) {
226                 ensureNames.add("name0=ReplicatedServer_id" + i + ",name1=replica." + j);
227             }
228         }
229         for (int i = 1; i <= numberOfPeers; i++) {
230             ensureNames.add("name0=ReplicatedServer_id" + i);
231         }
232         JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
233         for (int i = 1; i <= numberOfPeers; i++) {
234             // LocalPeerBean
235             String bean = MBeanRegistry.DOMAIN + ":name0=ReplicatedServer_id" + i + ",name1=replica." + i;
236             JMXEnv.ensureBeanAttribute(bean, "ConfigVersion");
237             JMXEnv.ensureBeanAttribute(bean, "LearnerType");
238             JMXEnv.ensureBeanAttribute(bean, "ClientAddress");
239             JMXEnv.ensureBeanAttribute(bean, "ElectionAddress");
240             JMXEnv.ensureBeanAttribute(bean, "QuorumSystemInfo");
241             JMXEnv.ensureBeanAttribute(bean, "Leader");
242         }
243 
244         for (int i = 1; i <= numberOfPeers; i++) {
245             for (int j = 1; j <= numberOfPeers; j++) {
246                 if (j != i) {
247                     // RemotePeerBean
248                     String bean = MBeanRegistry.DOMAIN + ":name0=ReplicatedServer_id" + i + ",name1=replica." + j;
249                     JMXEnv.ensureBeanAttribute(bean, "Name");
250                     JMXEnv.ensureBeanAttribute(bean, "LearnerType");
251                     JMXEnv.ensureBeanAttribute(bean, "ClientAddress");
252                     JMXEnv.ensureBeanAttribute(bean, "ElectionAddress");
253                     JMXEnv.ensureBeanAttribute(bean, "QuorumAddress");
254                     JMXEnv.ensureBeanAttribute(bean, "Leader");
255                 }
256             }
257         }
258     }
259 
260     @AfterEach
261     @Override
tearDown()262     public void tearDown() throws Exception {
263         LOG.info("TearDown started");
264         cht.tearDownAll();
265 
266         LOG.info("Shutting down server 1");
267         shutdown(s1);
268         LOG.info("Shutting down server 2");
269         shutdown(s2);
270         LOG.info("Shutting down server 3");
271         shutdown(s3);
272         LOG.info("Shutting down server 4");
273         shutdown(s4);
274         LOG.info("Shutting down server 5");
275         shutdown(s5);
276 
277         for (String hp : hostPort.split(",")) {
278             assertTrue(ClientBase.waitForServerDown(hp, ClientBase.CONNECTION_TIMEOUT), "waiting for server down");
279             LOG.info("{} is no longer accepting client connections", hp);
280         }
281 
282         JMXEnv.tearDown();
283     }
284 
shutdown(QuorumPeer qp)285     protected void shutdown(QuorumPeer qp) {
286         QuorumBase.shutdown(qp);
287     }
288 
createClient()289     protected TestableZooKeeper createClient() throws IOException, InterruptedException {
290         return createClient(hostPort);
291     }
292 
createClient(String hp)293     protected TestableZooKeeper createClient(String hp) throws IOException, InterruptedException {
294         CountdownWatcher watcher = new CountdownWatcher();
295         return createClient(watcher, hp);
296     }
297 
298     @Test
testHierarchicalQuorum()299     public void testHierarchicalQuorum() throws Throwable {
300         cht.runHammer(5, 10);
301     }
302 
303 }
304