1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.zookeeper.test; 20 21 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; 22 import static org.apache.zookeeper.test.ClientBase.verifyThreadTerminated; 23 import static org.junit.jupiter.api.Assertions.assertFalse; 24 import static org.junit.jupiter.api.Assertions.assertTrue; 25 import java.util.LinkedList; 26 import org.apache.zookeeper.AsyncCallback.DataCallback; 27 import org.apache.zookeeper.AsyncCallback.StringCallback; 28 import org.apache.zookeeper.AsyncCallback.VoidCallback; 29 import org.apache.zookeeper.CreateMode; 30 import org.apache.zookeeper.KeeperException; 31 import org.apache.zookeeper.TestableZooKeeper; 32 import org.apache.zookeeper.WatchedEvent; 33 import org.apache.zookeeper.ZKTestCase; 34 import org.apache.zookeeper.ZooDefs.Ids; 35 import org.apache.zookeeper.data.Stat; 36 import org.apache.zookeeper.test.ClientBase.CountdownWatcher; 37 import org.junit.jupiter.api.Test; 38 import org.slf4j.Logger; 39 import org.slf4j.LoggerFactory; 40 41 public class AsyncHammerTest extends ZKTestCase implements StringCallback, VoidCallback, DataCallback { 42 43 private static final Logger LOG = LoggerFactory.getLogger(AsyncHammerTest.class); 44 45 private QuorumBase qb = new QuorumBase(); 46 47 private volatile boolean bang; 48 setUp(boolean withObservers)49 public void setUp(boolean withObservers) throws Exception { 50 qb.setUp(withObservers); 51 } 52 restart()53 protected void restart() throws Exception { 54 LOG.info("RESTARTING {}", getTestName()); 55 qb.tearDown(); 56 57 // don't call setup - we don't want to reassign ports/dirs, etc... 58 JMXEnv.setUp(); 59 qb.startServers(); 60 } 61 tearDown()62 public void tearDown() throws Exception { 63 LOG.info("Test clients shutting down"); 64 qb.tearDown(); 65 } 66 67 /** 68 * Create /test- sequence nodes asynchronously, max 30 outstanding 69 */ 70 class HammerThread extends Thread implements StringCallback, VoidCallback { 71 72 private static final int MAX_OUTSTANDING = 30; 73 74 private TestableZooKeeper zk; 75 private int outstanding; 76 77 private volatile boolean failed = false; 78 HammerThread(String name)79 public HammerThread(String name) { 80 super(name); 81 } 82 run()83 public void run() { 84 try { 85 CountdownWatcher watcher = new CountdownWatcher(); 86 zk = new TestableZooKeeper(qb.hostPort, CONNECTION_TIMEOUT, watcher); 87 watcher.waitForConnected(CONNECTION_TIMEOUT); 88 while (bang) { 89 incOutstanding(); // before create otw race 90 zk.create("/test-", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, this, null); 91 } 92 } catch (InterruptedException e) { 93 if (bang) { 94 LOG.error("sanity check failed!!!"); // sanity check 95 return; 96 } 97 } catch (Exception e) { 98 LOG.error("Client create operation failed", e); 99 return; 100 } finally { 101 if (zk != null) { 102 try { 103 if (!zk.close(CONNECTION_TIMEOUT)) { 104 failed = true; 105 LOG.error("Client did not shutdown"); 106 } 107 } catch (InterruptedException e) { 108 LOG.info("Interrupted", e); 109 } 110 } 111 } 112 } 113 incOutstanding()114 private synchronized void incOutstanding() throws InterruptedException { 115 outstanding++; 116 while (outstanding > MAX_OUTSTANDING) { 117 wait(); 118 } 119 } 120 decOutstanding()121 private synchronized void decOutstanding() { 122 outstanding--; 123 assertTrue(outstanding >= 0, "outstanding >= 0"); 124 notifyAll(); 125 } 126 process(WatchedEvent event)127 public void process(WatchedEvent event) { 128 // ignore for purposes of this test 129 } 130 processResult(int rc, String path, Object ctx, String name)131 public void processResult(int rc, String path, Object ctx, String name) { 132 if (rc != KeeperException.Code.OK.intValue()) { 133 if (bang) { 134 failed = true; 135 LOG.error( 136 "Create failed for 0x{} with rc:{} path:{}", 137 Long.toHexString(zk.getSessionId()), 138 rc, 139 path); 140 } 141 decOutstanding(); 142 return; 143 } 144 try { 145 decOutstanding(); 146 zk.delete(name, -1, this, null); 147 } catch (Exception e) { 148 if (bang) { 149 failed = true; 150 LOG.error("Client delete failed", e); 151 } 152 } 153 } 154 processResult(int rc, String path, Object ctx)155 public void processResult(int rc, String path, Object ctx) { 156 if (rc != KeeperException.Code.OK.intValue()) { 157 if (bang) { 158 failed = true; 159 LOG.error( 160 "Delete failed for 0x{} with rc:{} path:{}", 161 Long.toHexString(zk.getSessionId()), 162 rc, 163 path); 164 } 165 } 166 } 167 168 } 169 170 @Test testHammer()171 public void testHammer() throws Exception { 172 setUp(false); 173 bang = true; 174 LOG.info("Starting hammers"); 175 HammerThread[] hammers = new HammerThread[100]; 176 for (int i = 0; i < hammers.length; i++) { 177 hammers[i] = new HammerThread("HammerThread-" + i); 178 hammers[i].start(); 179 } 180 LOG.info("Started hammers"); 181 Thread.sleep(5000); // allow the clients to run for max 5sec 182 bang = false; 183 LOG.info("Stopping hammers"); 184 for (int i = 0; i < hammers.length; i++) { 185 hammers[i].interrupt(); 186 verifyThreadTerminated(hammers[i], 60000); 187 assertFalse(hammers[i].failed); 188 } 189 190 // before restart 191 LOG.info("Hammers stopped, verifying consistency"); 192 qb.verifyRootOfAllServersMatch(qb.hostPort); 193 194 restart(); 195 196 // after restart 197 LOG.info("Verifying hammers 2"); 198 qb.verifyRootOfAllServersMatch(qb.hostPort); 199 tearDown(); 200 } 201 202 @Test testObserversHammer()203 public void testObserversHammer() throws Exception { 204 setUp(true); 205 bang = true; 206 Thread[] hammers = new Thread[100]; 207 for (int i = 0; i < hammers.length; i++) { 208 hammers[i] = new HammerThread("HammerThread-" + i); 209 hammers[i].start(); 210 } 211 Thread.sleep(5000); // allow the clients to run for max 5sec 212 bang = false; 213 for (int i = 0; i < hammers.length; i++) { 214 hammers[i].interrupt(); 215 verifyThreadTerminated(hammers[i], 60000); 216 } 217 // before restart 218 qb.verifyRootOfAllServersMatch(qb.hostPort); 219 tearDown(); 220 } 221 222 @SuppressWarnings("unchecked") processResult(int rc, String path, Object ctx, String name)223 public void processResult(int rc, String path, Object ctx, String name) { 224 synchronized (ctx) { 225 ((LinkedList<Integer>) ctx).add(rc); 226 ctx.notifyAll(); 227 } 228 } 229 230 @SuppressWarnings("unchecked") processResult(int rc, String path, Object ctx)231 public void processResult(int rc, String path, Object ctx) { 232 synchronized (ctx) { 233 ((LinkedList<Integer>) ctx).add(rc); 234 ctx.notifyAll(); 235 } 236 } 237 238 @SuppressWarnings("unchecked") processResult(int rc, String path, Object ctx, byte[] data, Stat stat)239 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 240 synchronized (ctx) { 241 ((LinkedList<Integer>) ctx).add(rc); 242 ctx.notifyAll(); 243 } 244 } 245 246 } 247