1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.zookeeper.server.quorum;
20 
21 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
22 import static org.junit.jupiter.api.Assertions.assertEquals;
23 import static org.junit.jupiter.api.Assertions.assertNotNull;
24 import static org.junit.jupiter.api.Assertions.assertNull;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import java.io.IOException;
27 import java.net.SocketTimeoutException;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import javax.security.sasl.SaslException;
31 import org.apache.zookeeper.AsyncCallback;
32 import org.apache.zookeeper.CreateMode;
33 import org.apache.zookeeper.PortAssignment;
34 import org.apache.zookeeper.ZooDefs.Ids;
35 import org.apache.zookeeper.ZooKeeper;
36 import org.apache.zookeeper.data.Stat;
37 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
38 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
39 import org.apache.zookeeper.test.ClientBase;
40 import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
41 import org.junit.jupiter.api.AfterEach;
42 import org.junit.jupiter.api.Test;
43 import org.junit.jupiter.api.Timeout;
44 
45 public class EphemeralNodeDeletionTest extends QuorumPeerTestBase {
46 
47     private static int SERVER_COUNT = 3;
48     private MainThread[] mt = new MainThread[SERVER_COUNT];
49 
50     /**
51      * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2355.
52      * ZooKeeper ephemeral node is never deleted if follower fail while reading
53      * the proposal packet.
54      */
55 
56     @Test
57     @Timeout(value = 120)
testEphemeralNodeDeletion()58     public void testEphemeralNodeDeletion() throws Exception {
59         final int[] clientPorts = new int[SERVER_COUNT];
60         StringBuilder sb = new StringBuilder();
61         String server;
62 
63         for (int i = 0; i < SERVER_COUNT; i++) {
64             clientPorts[i] = PortAssignment.unique();
65             server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
66                     + ":participant;127.0.0.1:" + clientPorts[i];
67             sb.append(server + "\n");
68         }
69         String currentQuorumCfgSection = sb.toString();
70         // start all the servers
71         for (int i = 0; i < SERVER_COUNT; i++) {
72             mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
73                 @Override
74                 public TestQPMain getTestQPMain() {
75                     return new MockTestQPMain();
76                 }
77             };
78             mt[i].start();
79         }
80 
81         // ensure all servers started
82         for (int i = 0; i < SERVER_COUNT; i++) {
83             assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
84                     "waiting for server " + i + " being up");
85         }
86 
87         CountdownWatcher watch = new CountdownWatcher();
88         ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[1], ClientBase.CONNECTION_TIMEOUT, watch);
89         watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
90 
91         /**
92          * now the problem scenario starts
93          */
94 
95         Stat firstEphemeralNode = new Stat();
96 
97         // 1: create ephemeral node
98         String nodePath = "/e1";
99         zk.create(nodePath, "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, firstEphemeralNode);
100         assertEquals(zk.getSessionId(), firstEphemeralNode.getEphemeralOwner(),
101                 "Current session and ephemeral owner should be same");
102 
103         // 2: inject network problem in one of the follower
104         CustomQuorumPeer follower = (CustomQuorumPeer) getByServerState(mt, ServerState.FOLLOWING);
105         follower.setInjectError(true);
106 
107         // 3: close the session so that ephemeral node is deleted
108         zk.close();
109 
110         // remove the error
111         follower.setInjectError(false);
112 
113         assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + follower.getClientPort(), CONNECTION_TIMEOUT),
114                 "Faulted Follower should have joined quorum by now");
115 
116         QuorumPeer leader = getByServerState(mt, ServerState.LEADING);
117         assertNotNull(leader, "Leader should not be null");
118         assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + leader.getClientPort(), CONNECTION_TIMEOUT),
119                 "Leader must be running");
120 
121         watch = new CountdownWatcher();
122         zk = new ZooKeeper("127.0.0.1:" + leader.getClientPort(), ClientBase.CONNECTION_TIMEOUT, watch);
123         watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
124 
125         Stat exists = zk.exists(nodePath, false);
126         assertNull(exists, "Node must have been deleted from leader");
127 
128         CountdownWatcher followerWatch = new CountdownWatcher();
129         ZooKeeper followerZK = new ZooKeeper(
130                 "127.0.0.1:" + follower.getClientPort(),
131                 ClientBase.CONNECTION_TIMEOUT,
132                 followerWatch);
133         followerWatch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
134         Stat nodeAtFollower = followerZK.exists(nodePath, false);
135 
136         // Problem 1: Follower had one extra ephemeral node /e1
137         assertNull(nodeAtFollower, "ephemeral node must not exist");
138 
139         // Create the node with another session
140         Stat currentEphemeralNode = new Stat();
141         zk.create(nodePath, "2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, currentEphemeralNode);
142 
143         // close the session and newly created ephemeral node should be deleted
144         zk.close();
145 
146         SyncCallback cb = new SyncCallback();
147         followerZK.sync(nodePath, cb, null);
148         cb.sync.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
149 
150         nodeAtFollower = followerZK.exists(nodePath, false);
151 
152         // Problem 2: Before fix, after session close the ephemeral node
153         // was not getting deleted. But now after the fix after session close
154         // ephemeral node is getting deleted.
155         assertNull(nodeAtFollower, "After session close ephemeral node must be deleted");
156         followerZK.close();
157     }
158 
159     @AfterEach
tearDown()160     public void tearDown() {
161         // stop all severs
162         for (int i = 0; i < mt.length; i++) {
163             try {
164                 mt[i].shutdown();
165             } catch (InterruptedException e) {
166                 LOG.warn("Quorum Peer interrupted while shutting it down", e);
167             }
168         }
169     }
170 
getByServerState(MainThread[] mt, ServerState state)171     private QuorumPeer getByServerState(MainThread[] mt, ServerState state) {
172         for (int i = mt.length - 1; i >= 0; i--) {
173             QuorumPeer quorumPeer = mt[i].getQuorumPeer();
174             if (null != quorumPeer && state == quorumPeer.getPeerState()) {
175                 return quorumPeer;
176             }
177         }
178         return null;
179     }
180 
181     static class CustomQuorumPeer extends QuorumPeer {
182 
183         private boolean injectError = false;
184 
CustomQuorumPeer()185         public CustomQuorumPeer() throws SaslException {
186 
187         }
188 
189         @Override
makeFollower(FileTxnSnapLog logFactory)190         protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
191             return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {
192 
193                 @Override
194                 void readPacket(QuorumPacket pp) throws IOException {
195                     /**
196                      * In real scenario got SocketTimeoutException while reading
197                      * the packet from leader because of network problem, but
198                      * here throwing SocketTimeoutException based on whether
199                      * error is injected or not
200                      */
201                     super.readPacket(pp);
202                     if (injectError && pp.getType() == Leader.PROPOSAL) {
203                         String type = LearnerHandler.packetToString(pp);
204                         throw new SocketTimeoutException("Socket timeout while reading the packet for operation "
205                                                                  + type);
206                     }
207                 }
208 
209             };
210         }
211 
setInjectError(boolean injectError)212         public void setInjectError(boolean injectError) {
213             this.injectError = injectError;
214         }
215 
216     }
217 
218     static class MockTestQPMain extends TestQPMain {
219 
220         @Override
221         protected QuorumPeer getQuorumPeer() throws SaslException {
222             return new CustomQuorumPeer();
223         }
224 
225     }
226 
227     private static class SyncCallback implements AsyncCallback.VoidCallback {
228 
229         private final CountDownLatch sync = new CountDownLatch(1);
230 
231         @Override
232         public void processResult(int rc, String path, Object ctx) {
233             sync.countDown();
234         }
235 
236     }
237 
238 }
239