1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.zookeeper.server.quorum; 20 21 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; 22 import static org.junit.jupiter.api.Assertions.assertEquals; 23 import static org.junit.jupiter.api.Assertions.assertNotNull; 24 import static org.junit.jupiter.api.Assertions.assertNull; 25 import static org.junit.jupiter.api.Assertions.assertTrue; 26 import java.io.IOException; 27 import java.net.SocketTimeoutException; 28 import java.util.concurrent.CountDownLatch; 29 import java.util.concurrent.TimeUnit; 30 import javax.security.sasl.SaslException; 31 import org.apache.zookeeper.AsyncCallback; 32 import org.apache.zookeeper.CreateMode; 33 import org.apache.zookeeper.PortAssignment; 34 import org.apache.zookeeper.ZooDefs.Ids; 35 import org.apache.zookeeper.ZooKeeper; 36 import org.apache.zookeeper.data.Stat; 37 import org.apache.zookeeper.server.persistence.FileTxnSnapLog; 38 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; 39 import org.apache.zookeeper.test.ClientBase; 40 import org.apache.zookeeper.test.ClientBase.CountdownWatcher; 41 import org.junit.jupiter.api.AfterEach; 42 import org.junit.jupiter.api.Test; 43 import org.junit.jupiter.api.Timeout; 44 45 public class EphemeralNodeDeletionTest extends QuorumPeerTestBase { 46 47 private static int SERVER_COUNT = 3; 48 private MainThread[] mt = new MainThread[SERVER_COUNT]; 49 50 /** 51 * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2355. 52 * ZooKeeper ephemeral node is never deleted if follower fail while reading 53 * the proposal packet. 54 */ 55 56 @Test 57 @Timeout(value = 120) testEphemeralNodeDeletion()58 public void testEphemeralNodeDeletion() throws Exception { 59 final int[] clientPorts = new int[SERVER_COUNT]; 60 StringBuilder sb = new StringBuilder(); 61 String server; 62 63 for (int i = 0; i < SERVER_COUNT; i++) { 64 clientPorts[i] = PortAssignment.unique(); 65 server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() 66 + ":participant;127.0.0.1:" + clientPorts[i]; 67 sb.append(server + "\n"); 68 } 69 String currentQuorumCfgSection = sb.toString(); 70 // start all the servers 71 for (int i = 0; i < SERVER_COUNT; i++) { 72 mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) { 73 @Override 74 public TestQPMain getTestQPMain() { 75 return new MockTestQPMain(); 76 } 77 }; 78 mt[i].start(); 79 } 80 81 // ensure all servers started 82 for (int i = 0; i < SERVER_COUNT; i++) { 83 assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT), 84 "waiting for server " + i + " being up"); 85 } 86 87 CountdownWatcher watch = new CountdownWatcher(); 88 ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[1], ClientBase.CONNECTION_TIMEOUT, watch); 89 watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); 90 91 /** 92 * now the problem scenario starts 93 */ 94 95 Stat firstEphemeralNode = new Stat(); 96 97 // 1: create ephemeral node 98 String nodePath = "/e1"; 99 zk.create(nodePath, "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, firstEphemeralNode); 100 assertEquals(zk.getSessionId(), firstEphemeralNode.getEphemeralOwner(), 101 "Current session and ephemeral owner should be same"); 102 103 // 2: inject network problem in one of the follower 104 CustomQuorumPeer follower = (CustomQuorumPeer) getByServerState(mt, ServerState.FOLLOWING); 105 follower.setInjectError(true); 106 107 // 3: close the session so that ephemeral node is deleted 108 zk.close(); 109 110 // remove the error 111 follower.setInjectError(false); 112 113 assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + follower.getClientPort(), CONNECTION_TIMEOUT), 114 "Faulted Follower should have joined quorum by now"); 115 116 QuorumPeer leader = getByServerState(mt, ServerState.LEADING); 117 assertNotNull(leader, "Leader should not be null"); 118 assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + leader.getClientPort(), CONNECTION_TIMEOUT), 119 "Leader must be running"); 120 121 watch = new CountdownWatcher(); 122 zk = new ZooKeeper("127.0.0.1:" + leader.getClientPort(), ClientBase.CONNECTION_TIMEOUT, watch); 123 watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); 124 125 Stat exists = zk.exists(nodePath, false); 126 assertNull(exists, "Node must have been deleted from leader"); 127 128 CountdownWatcher followerWatch = new CountdownWatcher(); 129 ZooKeeper followerZK = new ZooKeeper( 130 "127.0.0.1:" + follower.getClientPort(), 131 ClientBase.CONNECTION_TIMEOUT, 132 followerWatch); 133 followerWatch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); 134 Stat nodeAtFollower = followerZK.exists(nodePath, false); 135 136 // Problem 1: Follower had one extra ephemeral node /e1 137 assertNull(nodeAtFollower, "ephemeral node must not exist"); 138 139 // Create the node with another session 140 Stat currentEphemeralNode = new Stat(); 141 zk.create(nodePath, "2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, currentEphemeralNode); 142 143 // close the session and newly created ephemeral node should be deleted 144 zk.close(); 145 146 SyncCallback cb = new SyncCallback(); 147 followerZK.sync(nodePath, cb, null); 148 cb.sync.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); 149 150 nodeAtFollower = followerZK.exists(nodePath, false); 151 152 // Problem 2: Before fix, after session close the ephemeral node 153 // was not getting deleted. But now after the fix after session close 154 // ephemeral node is getting deleted. 155 assertNull(nodeAtFollower, "After session close ephemeral node must be deleted"); 156 followerZK.close(); 157 } 158 159 @AfterEach tearDown()160 public void tearDown() { 161 // stop all severs 162 for (int i = 0; i < mt.length; i++) { 163 try { 164 mt[i].shutdown(); 165 } catch (InterruptedException e) { 166 LOG.warn("Quorum Peer interrupted while shutting it down", e); 167 } 168 } 169 } 170 getByServerState(MainThread[] mt, ServerState state)171 private QuorumPeer getByServerState(MainThread[] mt, ServerState state) { 172 for (int i = mt.length - 1; i >= 0; i--) { 173 QuorumPeer quorumPeer = mt[i].getQuorumPeer(); 174 if (null != quorumPeer && state == quorumPeer.getPeerState()) { 175 return quorumPeer; 176 } 177 } 178 return null; 179 } 180 181 static class CustomQuorumPeer extends QuorumPeer { 182 183 private boolean injectError = false; 184 CustomQuorumPeer()185 public CustomQuorumPeer() throws SaslException { 186 187 } 188 189 @Override makeFollower(FileTxnSnapLog logFactory)190 protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { 191 return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) { 192 193 @Override 194 void readPacket(QuorumPacket pp) throws IOException { 195 /** 196 * In real scenario got SocketTimeoutException while reading 197 * the packet from leader because of network problem, but 198 * here throwing SocketTimeoutException based on whether 199 * error is injected or not 200 */ 201 super.readPacket(pp); 202 if (injectError && pp.getType() == Leader.PROPOSAL) { 203 String type = LearnerHandler.packetToString(pp); 204 throw new SocketTimeoutException("Socket timeout while reading the packet for operation " 205 + type); 206 } 207 } 208 209 }; 210 } 211 setInjectError(boolean injectError)212 public void setInjectError(boolean injectError) { 213 this.injectError = injectError; 214 } 215 216 } 217 218 static class MockTestQPMain extends TestQPMain { 219 220 @Override 221 protected QuorumPeer getQuorumPeer() throws SaslException { 222 return new CustomQuorumPeer(); 223 } 224 225 } 226 227 private static class SyncCallback implements AsyncCallback.VoidCallback { 228 229 private final CountDownLatch sync = new CountDownLatch(1); 230 231 @Override 232 public void processResult(int rc, String path, Object ctx) { 233 sync.countDown(); 234 } 235 236 } 237 238 } 239