1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.zookeeper.test;
20 
21 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
22 import static org.apache.zookeeper.test.ClientBase.verifyThreadTerminated;
23 import static org.junit.jupiter.api.Assertions.assertFalse;
24 import static org.junit.jupiter.api.Assertions.assertTrue;
25 import java.util.LinkedList;
26 import org.apache.zookeeper.AsyncCallback.DataCallback;
27 import org.apache.zookeeper.AsyncCallback.StringCallback;
28 import org.apache.zookeeper.AsyncCallback.VoidCallback;
29 import org.apache.zookeeper.CreateMode;
30 import org.apache.zookeeper.KeeperException;
31 import org.apache.zookeeper.TestableZooKeeper;
32 import org.apache.zookeeper.WatchedEvent;
33 import org.apache.zookeeper.ZKTestCase;
34 import org.apache.zookeeper.ZooDefs.Ids;
35 import org.apache.zookeeper.data.Stat;
36 import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
37 import org.junit.jupiter.api.Test;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 
41 public class AsyncHammerTest extends ZKTestCase implements StringCallback, VoidCallback, DataCallback {
42 
43     private static final Logger LOG = LoggerFactory.getLogger(AsyncHammerTest.class);
44 
45     private QuorumBase qb = new QuorumBase();
46 
47     private volatile boolean bang;
48 
setUp(boolean withObservers)49     public void setUp(boolean withObservers) throws Exception {
50         qb.setUp(withObservers);
51     }
52 
restart()53     protected void restart() throws Exception {
54         LOG.info("RESTARTING {}", getTestName());
55         qb.tearDown();
56 
57         // don't call setup - we don't want to reassign ports/dirs, etc...
58         JMXEnv.setUp();
59         qb.startServers();
60     }
61 
tearDown()62     public void tearDown() throws Exception {
63         LOG.info("Test clients shutting down");
64         qb.tearDown();
65     }
66 
67     /**
68      * Create /test- sequence nodes asynchronously, max 30 outstanding
69      */
70     class HammerThread extends Thread implements StringCallback, VoidCallback {
71 
72         private static final int MAX_OUTSTANDING = 30;
73 
74         private TestableZooKeeper zk;
75         private int outstanding;
76 
77         private volatile boolean failed = false;
78 
HammerThread(String name)79         public HammerThread(String name) {
80             super(name);
81         }
82 
run()83         public void run() {
84             try {
85                 CountdownWatcher watcher = new CountdownWatcher();
86                 zk = new TestableZooKeeper(qb.hostPort, CONNECTION_TIMEOUT, watcher);
87                 watcher.waitForConnected(CONNECTION_TIMEOUT);
88                 while (bang) {
89                     incOutstanding(); // before create otw race
90                     zk.create("/test-", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, this, null);
91                 }
92             } catch (InterruptedException e) {
93                 if (bang) {
94                     LOG.error("sanity check failed!!!"); // sanity check
95                     return;
96                 }
97             } catch (Exception e) {
98                 LOG.error("Client create operation failed", e);
99                 return;
100             } finally {
101                 if (zk != null) {
102                     try {
103                         if (!zk.close(CONNECTION_TIMEOUT)) {
104                             failed = true;
105                             LOG.error("Client did not shutdown");
106                         }
107                     } catch (InterruptedException e) {
108                         LOG.info("Interrupted", e);
109                     }
110                 }
111             }
112         }
113 
incOutstanding()114         private synchronized void incOutstanding() throws InterruptedException {
115             outstanding++;
116             while (outstanding > MAX_OUTSTANDING) {
117                 wait();
118             }
119         }
120 
decOutstanding()121         private synchronized void decOutstanding() {
122             outstanding--;
123             assertTrue(outstanding >= 0, "outstanding >= 0");
124             notifyAll();
125         }
126 
process(WatchedEvent event)127         public void process(WatchedEvent event) {
128             // ignore for purposes of this test
129         }
130 
processResult(int rc, String path, Object ctx, String name)131         public void processResult(int rc, String path, Object ctx, String name) {
132             if (rc != KeeperException.Code.OK.intValue()) {
133                 if (bang) {
134                     failed = true;
135                     LOG.error(
136                         "Create failed for 0x{} with rc:{} path:{}",
137                         Long.toHexString(zk.getSessionId()),
138                         rc,
139                         path);
140                 }
141                 decOutstanding();
142                 return;
143             }
144             try {
145                 decOutstanding();
146                 zk.delete(name, -1, this, null);
147             } catch (Exception e) {
148                 if (bang) {
149                     failed = true;
150                     LOG.error("Client delete failed", e);
151                 }
152             }
153         }
154 
processResult(int rc, String path, Object ctx)155         public void processResult(int rc, String path, Object ctx) {
156             if (rc != KeeperException.Code.OK.intValue()) {
157                 if (bang) {
158                     failed = true;
159                     LOG.error(
160                         "Delete failed for 0x{} with rc:{} path:{}",
161                         Long.toHexString(zk.getSessionId()),
162                         rc,
163                         path);
164                 }
165             }
166         }
167 
168     }
169 
170     @Test
testHammer()171     public void testHammer() throws Exception {
172         setUp(false);
173         bang = true;
174         LOG.info("Starting hammers");
175         HammerThread[] hammers = new HammerThread[100];
176         for (int i = 0; i < hammers.length; i++) {
177             hammers[i] = new HammerThread("HammerThread-" + i);
178             hammers[i].start();
179         }
180         LOG.info("Started hammers");
181         Thread.sleep(5000); // allow the clients to run for max 5sec
182         bang = false;
183         LOG.info("Stopping hammers");
184         for (int i = 0; i < hammers.length; i++) {
185             hammers[i].interrupt();
186             verifyThreadTerminated(hammers[i], 60000);
187             assertFalse(hammers[i].failed);
188         }
189 
190         // before restart
191         LOG.info("Hammers stopped, verifying consistency");
192         qb.verifyRootOfAllServersMatch(qb.hostPort);
193 
194         restart();
195 
196         // after restart
197         LOG.info("Verifying hammers 2");
198         qb.verifyRootOfAllServersMatch(qb.hostPort);
199         tearDown();
200     }
201 
202     @Test
testObserversHammer()203     public void testObserversHammer() throws Exception {
204         setUp(true);
205         bang = true;
206         Thread[] hammers = new Thread[100];
207         for (int i = 0; i < hammers.length; i++) {
208             hammers[i] = new HammerThread("HammerThread-" + i);
209             hammers[i].start();
210         }
211         Thread.sleep(5000); // allow the clients to run for max 5sec
212         bang = false;
213         for (int i = 0; i < hammers.length; i++) {
214             hammers[i].interrupt();
215             verifyThreadTerminated(hammers[i], 60000);
216         }
217         // before restart
218         qb.verifyRootOfAllServersMatch(qb.hostPort);
219         tearDown();
220     }
221 
222     @SuppressWarnings("unchecked")
processResult(int rc, String path, Object ctx, String name)223     public void processResult(int rc, String path, Object ctx, String name) {
224         synchronized (ctx) {
225             ((LinkedList<Integer>) ctx).add(rc);
226             ctx.notifyAll();
227         }
228     }
229 
230     @SuppressWarnings("unchecked")
processResult(int rc, String path, Object ctx)231     public void processResult(int rc, String path, Object ctx) {
232         synchronized (ctx) {
233             ((LinkedList<Integer>) ctx).add(rc);
234             ctx.notifyAll();
235         }
236     }
237 
238     @SuppressWarnings("unchecked")
processResult(int rc, String path, Object ctx, byte[] data, Stat stat)239     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
240         synchronized (ctx) {
241             ((LinkedList<Integer>) ctx).add(rc);
242             ctx.notifyAll();
243         }
244     }
245 
246 }
247