1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.zookeeper;
20 
21 import static org.hamcrest.CoreMatchers.is;
22 import static org.hamcrest.MatcherAssert.assertThat;
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.junit.jupiter.api.Assertions.assertFalse;
25 import static org.junit.jupiter.api.Assertions.assertNotNull;
26 import static org.junit.jupiter.api.Assertions.assertNull;
27 import static org.junit.jupiter.api.Assertions.assertTrue;
28 import static org.junit.jupiter.api.Assertions.fail;
29 import static org.mockito.Mockito.doReturn;
30 import static org.mockito.Mockito.spy;
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import org.apache.commons.collections4.CollectionUtils;
41 import org.apache.zookeeper.KeeperException.Code;
42 import org.apache.zookeeper.Watcher.Event.EventType;
43 import org.apache.zookeeper.Watcher.WatcherType;
44 import org.apache.zookeeper.ZooDefs.Ids;
45 import org.apache.zookeeper.server.ServerCnxn;
46 import org.apache.zookeeper.test.ClientBase;
47 import org.junit.jupiter.api.AfterEach;
48 import org.junit.jupiter.api.BeforeEach;
49 import org.junit.jupiter.api.Timeout;
50 import org.junit.jupiter.params.ParameterizedTest;
51 import org.junit.jupiter.params.provider.ValueSource;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 
55 /**
56  * Verifies removing watches using ZooKeeper client apis
57  */
58 public class RemoveWatchesTest extends ClientBase {
59 
60     private static final Logger LOG = LoggerFactory.getLogger(RemoveWatchesTest.class);
61     private ZooKeeper zk1 = null;
62     private ZooKeeper zk2 = null;
63 
64     @BeforeEach
65     @Override
setUp()66     public void setUp() throws Exception {
67         super.setUp();
68         zk1 = createClient();
69         zk2 = createClient();
70     }
71 
72     @AfterEach
73     @Override
tearDown()74     public void tearDown() throws Exception {
75         if (zk1 != null) {
76             zk1.close();
77         }
78         if (zk2 != null) {
79             zk2.close();
80         }
81         super.tearDown();
82     }
83 
removeWatches( ZooKeeper zk, String path, Watcher watcher, WatcherType watcherType, boolean local, KeeperException.Code rc, boolean useAsync)84     private void removeWatches(
85         ZooKeeper zk,
86         String path,
87         Watcher watcher,
88         WatcherType watcherType,
89         boolean local,
90         KeeperException.Code rc,
91         boolean useAsync) throws InterruptedException, KeeperException {
92         LOG.info("Sending removeWatches req using zk {} path: {} type: {} watcher: {} ", zk, path, watcherType, watcher);
93         if (useAsync) {
94             MyCallback c1 = new MyCallback(rc.intValue(), path);
95             zk.removeWatches(path, watcher, watcherType, local, c1, null);
96             assertTrue(c1.matches(), "Didn't succeeds removeWatch operation");
97             if (KeeperException.Code.OK.intValue() != c1.rc) {
98                 KeeperException ke = KeeperException.create(KeeperException.Code.get(c1.rc));
99                 throw ke;
100             }
101         } else {
102             zk.removeWatches(path, watcher, watcherType, local);
103         }
104     }
105 
removeAllWatches( ZooKeeper zk, String path, WatcherType watcherType, boolean local, KeeperException.Code rc, boolean useAsync)106     private void removeAllWatches(
107         ZooKeeper zk,
108         String path,
109         WatcherType watcherType,
110         boolean local,
111         KeeperException.Code rc,
112         boolean useAsync) throws InterruptedException, KeeperException {
113         LOG.info("Sending removeWatches req using zk {} path: {} type: {} ", zk, path, watcherType);
114         if (useAsync) {
115             MyCallback c1 = new MyCallback(rc.intValue(), path);
116             zk.removeAllWatches(path, watcherType, local, c1, null);
117             assertTrue(c1.matches(), "Didn't succeeds removeWatch operation");
118             if (KeeperException.Code.OK.intValue() != c1.rc) {
119                 KeeperException ke = KeeperException.create(KeeperException.Code.get(c1.rc));
120                 throw ke;
121             }
122         } else {
123             zk.removeAllWatches(path, watcherType, local);
124         }
125     }
126 
127     /**
128      * Test verifies removal of single watcher when there is server connection
129      */
130     @ParameterizedTest
131     @ValueSource(booleans = {true, false})
132     @Timeout(value = 90)
testRemoveSingleWatcher(boolean useAsync)133     public void testRemoveSingleWatcher(boolean useAsync) throws Exception {
134         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
135         zk1.create("/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
136         MyWatcher w1 = new MyWatcher("/node1", 1);
137         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
138         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
139         MyWatcher w2 = new MyWatcher("/node2", 1);
140         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
141         assertNotNull(zk2.exists("/node2", w2), "Didn't set data watches");
142         removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK, useAsync);
143         assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher");
144         assertEquals("/node2", zk2.getDataWatches().get(0), "Didn't find data watcher");
145         removeWatches(zk2, "/node2", w2, WatcherType.Any, false, Code.OK, useAsync);
146         assertTrue(w2.matches(), "Didn't remove data watcher");
147         // closing session should remove ephemeral nodes and trigger data
148         // watches if any
149         if (zk1 != null) {
150             zk1.close();
151             zk1 = null;
152         }
153 
154         List<EventType> events = w1.getEventsAfterWatchRemoval();
155         assertFalse(events.contains(EventType.NodeDeleted), "Shouldn't get NodeDeletedEvent after watch removal");
156         assertEquals(0, events.size(), "Shouldn't get NodeDeletedEvent after watch removal");
157     }
158 
159     /**
160      * Test verifies removal of multiple data watchers when there is server
161      * connection
162      */
163     @ParameterizedTest
164     @ValueSource(booleans = {true, false})
165     @Timeout(value = 90)
testMultipleDataWatchers(boolean useAsync)166     public void testMultipleDataWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException {
167         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
168         MyWatcher w1 = new MyWatcher("/node1", 1);
169         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
170         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
171         MyWatcher w2 = new MyWatcher("/node1", 1);
172         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
173         assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
174         removeWatches(zk2, "/node1", w2, WatcherType.Data, false, Code.OK, useAsync);
175         assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher");
176         assertEquals("/node1", zk2.getDataWatches().get(0), "Didn't find data watcher");
177         removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
178         assertTrue(w2.matches(), "Didn't remove data watcher");
179         // closing session should remove ephemeral nodes and trigger data
180         // watches if any
181         if (zk1 != null) {
182             zk1.close();
183             zk1 = null;
184         }
185 
186         List<EventType> events = w2.getEventsAfterWatchRemoval();
187         assertEquals(0, events.size(), "Shouldn't get NodeDeletedEvent after watch removal");
188     }
189 
190     /**
191      * Test verifies removal of multiple child watchers when there is server
192      * connection
193      */
194     @ParameterizedTest
195     @ValueSource(booleans = {true, false})
196     @Timeout(value = 90)
testMultipleChildWatchers(boolean useAsync)197     public void testMultipleChildWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException {
198         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
199         MyWatcher w1 = new MyWatcher("/node1", 1);
200         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
201         zk2.getChildren("/node1", w1);
202         MyWatcher w2 = new MyWatcher("/node1", 1);
203         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
204         zk2.getChildren("/node1", w2);
205         removeWatches(zk2, "/node1", w2, WatcherType.Children, false, Code.OK, useAsync);
206         assertTrue(w2.matches(), "Didn't remove child watcher");
207         assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher");
208         removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
209         assertTrue(w1.matches(), "Didn't remove child watcher");
210         // create child to see NodeChildren notification
211         zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
212         // waiting for child watchers to be notified
213         int count = 30;
214         while (count > 0) {
215             if (w1.getEventsAfterWatchRemoval().size() > 0) {
216                 break;
217             }
218             count--;
219             Thread.sleep(100);
220         }
221         // watcher2
222         List<EventType> events = w2.getEventsAfterWatchRemoval();
223         assertEquals(0, events.size(), "Shouldn't get NodeChildrenChanged event");
224     }
225 
226     /**
227      * Test verifies null watcher with WatcherType.Any - remove all the watchers
228      * data, child, exists
229      */
230     @ParameterizedTest
231     @ValueSource(booleans = {true, false})
232     @Timeout(value = 90)
testRemoveAllWatchers(boolean useAsync)233     public void testRemoveAllWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException {
234         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
235         MyWatcher w1 = new MyWatcher("/node1", 2);
236         MyWatcher w2 = new MyWatcher("/node1", 2);
237         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
238         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
239         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
240         assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
241         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
242         zk2.getChildren("/node1", w1);
243         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
244         zk2.getChildren("/node1", w2);
245         removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
246         removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync);
247         zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
248         assertTrue(w1.matches(), "Didn't remove data watcher");
249         assertTrue(w2.matches(), "Didn't remove child watcher");
250     }
251 
252     /**
253      * Test verifies null watcher with WatcherType.Data - remove all data
254      * watchers. Child watchers shouldn't be removed
255      */
256     @ParameterizedTest
257     @ValueSource(booleans = {true, false})
258     @Timeout(value = 90)
testRemoveAllDataWatchers(boolean useAsync)259     public void testRemoveAllDataWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException {
260         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
261         MyWatcher w1 = new MyWatcher("/node1", 1);
262         MyWatcher w2 = new MyWatcher("/node1", 1);
263         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
264         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
265         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
266         assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
267         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
268         zk2.getChildren("/node1", w1);
269         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
270         zk2.getChildren("/node1", w2);
271         removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK, useAsync);
272         removeWatches(zk2, "/node1", w2, WatcherType.Data, false, Code.OK, useAsync);
273         zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
274         assertTrue(w1.matches(), "Didn't remove data watcher");
275         assertTrue(w2.matches(), "Didn't remove data watcher");
276         // waiting for child watchers to be notified
277         int count = 10;
278         while (count > 0) {
279             if (w1.getEventsAfterWatchRemoval().size() > 0 && w2.getEventsAfterWatchRemoval().size() > 0) {
280                 break;
281             }
282             count--;
283             Thread.sleep(1000);
284         }
285         // watcher1
286         List<EventType> events = w1.getEventsAfterWatchRemoval();
287         assertEquals(1, events.size(), "Didn't get NodeChildrenChanged event");
288         assertTrue(events.contains(EventType.NodeChildrenChanged), "Didn't get NodeChildrenChanged event");
289         // watcher2
290         events = w2.getEventsAfterWatchRemoval();
291         assertEquals(1, events.size(), "Didn't get NodeChildrenChanged event");
292         assertTrue(events.contains(EventType.NodeChildrenChanged), "Didn't get NodeChildrenChanged event");
293     }
294 
295     /**
296      * Test verifies null watcher with WatcherType.Children - remove all child
297      * watchers. Data watchers shouldn't be removed
298      */
299     @ParameterizedTest
300     @ValueSource(booleans = {true, false})
301     @Timeout(value = 90)
testRemoveAllChildWatchers(boolean useAsync)302     public void testRemoveAllChildWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException {
303         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
304         MyWatcher w1 = new MyWatcher("/node1", 1);
305         MyWatcher w2 = new MyWatcher("/node1", 1);
306         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
307         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
308         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
309         assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
310         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
311         zk2.getChildren("/node1", w1);
312         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
313         zk2.getChildren("/node1", w2);
314         removeWatches(zk2, "/node1", w1, WatcherType.Children, false, Code.OK, useAsync);
315         removeWatches(zk2, "/node1", w2, WatcherType.Children, false, Code.OK, useAsync);
316         zk1.setData("/node1", "test".getBytes(), -1);
317         assertTrue(w1.matches(), "Didn't remove child watcher");
318         assertTrue(w2.matches(), "Didn't remove child watcher");
319         // waiting for child watchers to be notified
320         int count = 10;
321         while (count > 0) {
322             if (w1.getEventsAfterWatchRemoval().size() > 0 && w2.getEventsAfterWatchRemoval().size() > 0) {
323                 break;
324             }
325             count--;
326             Thread.sleep(1000);
327         }
328         // watcher1
329         List<EventType> events = w1.getEventsAfterWatchRemoval();
330         assertEquals(1, events.size(), "Didn't get NodeDataChanged event");
331         assertTrue(events.contains(EventType.NodeDataChanged), "Didn't get NodeDataChanged event");
332         // watcher2
333         events = w2.getEventsAfterWatchRemoval();
334         assertEquals(1, events.size(), "Didn't get NodeDataChanged event");
335         assertTrue(events.contains(EventType.NodeDataChanged), "Didn't get NodeDataChanged event");
336     }
337 
338     /**
339      * Test verifies given watcher doesn't exists!
340      */
341     @ParameterizedTest
342     @ValueSource(booleans = {true, false})
343     @Timeout(value = 90)
testNoWatcherException(boolean useAsync)344     public void testNoWatcherException(boolean useAsync) throws IOException, InterruptedException, KeeperException {
345         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
346         MyWatcher w1 = new MyWatcher("/node1", 2);
347         MyWatcher w2 = new MyWatcher("/node1", 2);
348         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
349         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
350         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
351         assertNull(zk2.exists("/node2", w2), "Didn't set data watches");
352         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
353         zk2.getChildren("/node1", w1);
354         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
355         zk2.getChildren("/node1", w2);
356 
357         // New Watcher which will be used for removal
358         MyWatcher w3 = new MyWatcher("/node1", 2);
359 
360         try {
361             removeWatches(zk2, "/node1", w3, WatcherType.Any, false, Code.NOWATCHER, useAsync);
362             fail("Should throw exception as given watcher doesn't exists");
363         } catch (KeeperException.NoWatcherException nwe) {
364             // expected
365         }
366         try {
367             removeWatches(zk2, "/node1", w3, WatcherType.Children, false, Code.NOWATCHER, useAsync);
368             fail("Should throw exception as given watcher doesn't exists");
369         } catch (KeeperException.NoWatcherException nwe) {
370             // expected
371         }
372         try {
373             removeWatches(zk2, "/node1", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync);
374             fail("Should throw exception as given watcher doesn't exists");
375         } catch (KeeperException.NoWatcherException nwe) {
376             // expected
377         }
378         try {
379             removeWatches(zk2, "/nonexists", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync);
380             fail("Should throw exception as given watcher doesn't exists");
381         } catch (KeeperException.NoWatcherException nwe) {
382             // expected
383         }
384     }
385 
386     /**
387      * Test verifies WatcherType.Any - removes only the configured data watcher
388      * function
389      */
390     @ParameterizedTest
391     @ValueSource(booleans = {true, false})
392     @Timeout(value = 90)
testRemoveAnyDataWatcher(boolean useAsync)393     public void testRemoveAnyDataWatcher(boolean useAsync) throws Exception {
394         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
395         MyWatcher w1 = new MyWatcher("/node1", 1);
396         MyWatcher w2 = new MyWatcher("/node1", 2);
397         // Add multiple data watches
398         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
399         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
400         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
401         assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
402         // Add child watch
403         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
404         zk2.getChildren("/node1", w2);
405         removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
406         assertTrue(w1.matches(), "Didn't remove data watcher");
407         assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher");
408         assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher");
409         removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync);
410         assertTrue(w2.matches(), "Didn't remove child watcher");
411     }
412 
413     /**
414      * Test verifies WatcherType.Any - removes only the configured child watcher
415      * function
416      */
417     @ParameterizedTest
418     @ValueSource(booleans = {true, false})
419     @Timeout(value = 90)
testRemoveAnyChildWatcher(boolean useAsync)420     public void testRemoveAnyChildWatcher(boolean useAsync) throws Exception {
421         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
422         MyWatcher w1 = new MyWatcher("/node1", 2);
423         MyWatcher w2 = new MyWatcher("/node1", 1);
424         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
425         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
426         // Add multiple child watches
427         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
428         zk2.getChildren("/node1", w2);
429         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
430         zk2.getChildren("/node1", w1);
431         removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync);
432         assertTrue(w2.matches(), "Didn't remove child watcher");
433         assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher");
434         assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher");
435         removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
436         assertTrue(w1.matches(), "Didn't remove watchers");
437     }
438 
439     /**
440      * Test verifies when there is no server connection. Remove watches when
441      * local=true, otw should retain it
442      */
443     @ParameterizedTest
444     @ValueSource(booleans = {true, false})
445     @Timeout(value = 90)
testRemoveWatcherWhenNoConnection(boolean useAsync)446     public void testRemoveWatcherWhenNoConnection(boolean useAsync) throws Exception {
447         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
448         MyWatcher w1 = new MyWatcher("/node1", 2);
449         MyWatcher w2 = new MyWatcher("/node1", 1);
450         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
451         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
452         // Add multiple child watches
453         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
454         zk2.getChildren("/node1", w1);
455         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
456         zk2.getChildren("/node1", w2);
457         stopServer();
458         removeWatches(zk2, "/node1", w2, WatcherType.Any, true, Code.OK, useAsync);
459         assertTrue(w2.matches(), "Didn't remove child watcher");
460         assertFalse(w1.matches(), "Shouldn't remove data watcher");
461         try {
462             removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.CONNECTIONLOSS, useAsync);
463             fail("Should throw exception as last watch removal requires server connection");
464         } catch (KeeperException.ConnectionLossException nwe) {
465             // expected
466         }
467         assertFalse(w1.matches(), "Shouldn't remove data watcher");
468 
469         // when local=true, here if connection not available, simply removes
470         // from local session
471         removeWatches(zk2, "/node1", w1, WatcherType.Any, true, Code.OK, useAsync);
472         assertTrue(w1.matches(), "Didn't remove data watcher");
473     }
474 
475     /**
476      * Test verifies many pre-node watchers. Also, verifies internal
477      * datastructure 'watchManager.existWatches'
478      */
479     @ParameterizedTest
480     @ValueSource(booleans = {true, false})
481     @Timeout(value = 90)
testManyPreNodeWatchers(boolean useAsync)482     public void testManyPreNodeWatchers(boolean useAsync) throws Exception {
483         int count = 50;
484         List<MyWatcher> wList = new ArrayList<MyWatcher>(count);
485         MyWatcher w;
486         String path = "/node";
487         // Exists watcher
488         for (int i = 0; i < count; i++) {
489             final String nodePath = path + i;
490             w = new MyWatcher(nodePath, 1);
491             wList.add(w);
492             LOG.info("Adding pre node watcher {} on path {}", w, nodePath);
493             zk1.exists(nodePath, w);
494         }
495         assertEquals(count, zk1.getExistWatches().size(), "Failed to add watchers!");
496         for (int i = 0; i < count; i++) {
497             final MyWatcher watcher = wList.get(i);
498             removeWatches(zk1, path + i, watcher, WatcherType.Data, false, Code.OK, useAsync);
499             assertTrue(watcher.matches(), "Didn't remove data watcher");
500         }
501         assertEquals(0, zk1.getExistWatches().size(), "Didn't remove watch references!");
502     }
503 
504     /**
505      * Test verifies many child watchers. Also, verifies internal datastructure
506      * 'watchManager.childWatches'
507      */
508     @ParameterizedTest
509     @ValueSource(booleans = {true, false})
510     @Timeout(value = 90)
testManyChildWatchers(boolean useAsync)511     public void testManyChildWatchers(boolean useAsync) throws Exception {
512         int count = 50;
513         List<MyWatcher> wList = new ArrayList<MyWatcher>(count);
514         MyWatcher w;
515         String path = "/node";
516 
517         // Child watcher
518         for (int i = 0; i < count; i++) {
519             String nodePath = path + i;
520             zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
521             nodePath += "/";
522         }
523         for (int i = 0; i < count; i++) {
524             String nodePath = path + i;
525             w = new MyWatcher(path + i, 1);
526             wList.add(w);
527             LOG.info("Adding child watcher {} on path {}", w, nodePath);
528             zk1.getChildren(nodePath, w);
529             nodePath += "/";
530         }
531         assertEquals(count, zk1.getChildWatches().size(), "Failed to add watchers!");
532         for (int i = 0; i < count; i++) {
533             final MyWatcher watcher = wList.get(i);
534             removeWatches(zk1, path + i, watcher, WatcherType.Children, false, Code.OK, useAsync);
535             assertTrue(watcher.matches(), "Didn't remove child watcher");
536         }
537         assertEquals(0, zk1.getChildWatches().size(), "Didn't remove watch references!");
538     }
539 
540     /**
541      * Test verifies many data watchers. Also, verifies internal datastructure
542      * 'watchManager.dataWatches'
543      */
544     @ParameterizedTest
545     @ValueSource(booleans = {true, false})
546     @Timeout(value = 90)
testManyDataWatchers(boolean useAsync)547     public void testManyDataWatchers(boolean useAsync) throws Exception {
548         int count = 50;
549         List<MyWatcher> wList = new ArrayList<MyWatcher>(count);
550         MyWatcher w;
551         String path = "/node";
552 
553         // Data watcher
554         for (int i = 0; i < count; i++) {
555             String nodePath = path + i;
556             w = new MyWatcher(path + i, 1);
557             wList.add(w);
558             zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
559             LOG.info("Adding data watcher {} on path {}", w, nodePath);
560             zk1.getData(nodePath, w, null);
561             nodePath += "/";
562         }
563         assertEquals(count, zk1.getDataWatches().size(), "Failed to add watchers!");
564         for (int i = 0; i < count; i++) {
565             final MyWatcher watcher = wList.get(i);
566             removeWatches(zk1, path + i, watcher, WatcherType.Data, false, Code.OK, useAsync);
567             assertTrue(watcher.matches(), "Didn't remove data watcher");
568         }
569         assertEquals(0, zk1.getDataWatches().size(), "Didn't remove watch references!");
570     }
571 
572     /**
573      * Test verifies removal of many watchers locally when no connection and
574      * WatcherType#Any. Also, verifies internal watchManager datastructures
575      */
576     @ParameterizedTest
577     @ValueSource(booleans = {true, false})
578     @Timeout(value = 90)
testManyWatchersWhenNoConnection(boolean useAsync)579     public void testManyWatchersWhenNoConnection(boolean useAsync) throws Exception {
580         int count = 3;
581         List<MyWatcher> wList = new ArrayList<MyWatcher>(count);
582         MyWatcher w;
583         String path = "/node";
584 
585         // Child watcher
586         for (int i = 0; i < count; i++) {
587             String nodePath = path + i;
588             zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
589             nodePath += "/";
590         }
591         for (int i = 0; i < count; i++) {
592             String nodePath = path + i;
593             w = new MyWatcher(path + i, 2);
594             wList.add(w);
595             LOG.info("Adding child watcher {} on path {}", w, nodePath);
596             zk1.getChildren(nodePath, w);
597             nodePath += "/";
598         }
599         assertEquals(count, zk1.getChildWatches().size(), "Failed to add watchers!");
600 
601         // Data watcher
602         for (int i = 0; i < count; i++) {
603             String nodePath = path + i;
604             w = wList.get(i);
605             LOG.info("Adding data watcher {} on path {}", w, nodePath);
606             zk1.getData(nodePath, w, null);
607             nodePath += "/";
608         }
609         assertEquals(count, zk1.getDataWatches().size(), "Failed to add watchers!");
610         stopServer();
611         for (int i = 0; i < count; i++) {
612             final MyWatcher watcher = wList.get(i);
613             removeWatches(zk1, path + i, watcher, WatcherType.Any, true, Code.OK, useAsync);
614             assertTrue(watcher.matches(), "Didn't remove watcher");
615         }
616         assertEquals(0, zk1.getChildWatches().size(), "Didn't remove watch references!");
617         assertEquals(0, zk1.getDataWatches().size(), "Didn't remove watch references!");
618     }
619 
620     /**
621      * Test verifies removing watcher having namespace
622      */
623     @ParameterizedTest
624     @ValueSource(booleans = {true, false})
625     @Timeout(value = 90)
testChRootRemoveWatcher(boolean useAsync)626     public void testChRootRemoveWatcher(boolean useAsync) throws Exception {
627         // creating the subtree for chRoot clients.
628         String chRoot = "/appsX";
629         zk1.create("/appsX", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
630         if (zk1 != null) {
631             zk1.close();
632         }
633         if (zk2 != null) {
634             zk2.close();
635         }
636         // Creating chRoot client.
637         zk1 = createClient(this.hostPort + chRoot);
638         zk2 = createClient(this.hostPort + chRoot);
639 
640         LOG.info("Creating child znode /node1 using chRoot client");
641         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
642         MyWatcher w1 = new MyWatcher("/node1", 2);
643         MyWatcher w2 = new MyWatcher("/node1", 1);
644         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
645         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
646         // Add multiple child watches
647         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
648         zk2.getChildren("/node1", w2);
649         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
650         zk2.getChildren("/node1", w1);
651         removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync);
652         assertTrue(w1.matches(), "Didn't remove child watcher");
653         assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher");
654         removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync);
655         assertTrue(w2.matches(), "Didn't remove child watcher");
656     }
657 
658     /**
659      * Verify that if a given watcher doesn't exist, the server properly
660      * returns an error code for it.
661      *
662      * In our Java client implementation, we check that a given watch exists at
663      * two points:
664      *
665      * 1) before submitting the RemoveWatches request
666      * 2) after a successful server response, when the watcher needs to be
667      *    removed
668      *
669      * Since this can be racy (i.e. a watch can fire while a RemoveWatches
670      * request is in-flight), we need to verify that the watch was actually
671      * removed (i.e. from ZKDatabase and DataTree) and return NOWATCHER if
672      * needed.
673      *
674      * Also, other implementations might not do a client side check before
675      * submitting a RemoveWatches request. If we don't do a server side check,
676      * we would just return ZOK even if no watch was removed.
677      *
678      */
679     @ParameterizedTest
680     @ValueSource(booleans = {true, false})
681     @Timeout(value = 90)
testNoWatcherServerException(boolean useAsync)682     public void testNoWatcherServerException(boolean useAsync) throws InterruptedException, IOException, TimeoutException {
683         CountdownWatcher watcher = new CountdownWatcher();
684         ZooKeeper zk = spy(new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher));
685         MyWatchManager watchManager = new MyWatchManager(false, watcher);
686         doReturn(watchManager).when(zk).getWatchManager();
687         boolean nw = false;
688 
689         watcher.waitForConnected(CONNECTION_TIMEOUT);
690 
691         try {
692             zk.removeWatches("/nowatchhere", watcher, WatcherType.Data, false);
693         } catch (KeeperException nwe) {
694             if (nwe.code().intValue() == Code.NOWATCHER.intValue()) {
695                 nw = true;
696             }
697         }
698 
699         assertThat("Server didn't return NOWATCHER", watchManager.lastReturnCode, is(Code.NOWATCHER.intValue()));
700         assertThat("NoWatcherException didn't happen", nw, is(true));
701     }
702 
703     /**
704      * Test verifies given watcher doesn't exists!
705      */
706     @ParameterizedTest
707     @ValueSource(booleans = {true, false})
708     @Timeout(value = 90)
testRemoveAllNoWatcherException(boolean useAsync)709     public void testRemoveAllNoWatcherException(boolean useAsync) throws IOException, InterruptedException, KeeperException {
710         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
711         try {
712             removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.NOWATCHER, useAsync);
713             fail("Should throw exception as given watcher doesn't exists");
714         } catch (KeeperException.NoWatcherException nwe) {
715             // expected
716         }
717     }
718 
719     /**
720      * Test verifies null watcher
721      */
722     @ParameterizedTest
723     @ValueSource(booleans = {true, false})
724     @Timeout(value = 30)
testNullWatcherReference(boolean useAsync)725     public void testNullWatcherReference(boolean useAsync) throws Exception {
726         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
727         try {
728             if (useAsync) {
729                 zk1.removeWatches("/node1", null, WatcherType.Data, false, null, null);
730             } else {
731                 zk1.removeWatches("/node1", null, WatcherType.Data, false);
732             }
733             fail("Must throw IllegalArgumentException as watcher is null!");
734         } catch (IllegalArgumentException iae) {
735             // expected
736         }
737     }
738 
739     /**
740      * Test verifies WatcherType.Data - removes only the configured data watcher
741      * function
742      */
743     @ParameterizedTest
744     @ValueSource(booleans = {true, false})
745     @Timeout(value = 90)
testRemoveWhenMultipleDataWatchesOnAPath(boolean useAsync)746     public void testRemoveWhenMultipleDataWatchesOnAPath(boolean useAsync) throws Exception {
747         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
748         final CountDownLatch dataWatchCount = new CountDownLatch(1);
749         final CountDownLatch rmWatchCount = new CountDownLatch(1);
750         Watcher w1 = event -> {
751             if (event.getType() == EventType.DataWatchRemoved) {
752                 rmWatchCount.countDown();
753             }
754         };
755         Watcher w2 = event -> {
756             if (event.getType() == EventType.NodeDataChanged) {
757                 dataWatchCount.countDown();
758             }
759         };
760         // Add multiple data watches
761         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
762         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
763         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
764         assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
765 
766         removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK, useAsync);
767         assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");
768 
769         zk1.setData("/node1", "test".getBytes(), -1);
770         LOG.info("Waiting for data watchers to be notified");
771         assertTrue(dataWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't get data watch notification!");
772     }
773 
774     /**
775      * Test verifies WatcherType.Children - removes only the configured child
776      * watcher function
777      */
778     @ParameterizedTest
779     @ValueSource(booleans = {true, false})
780     @Timeout(value = 90)
testRemoveWhenMultipleChildWatchesOnAPath(boolean useAsync)781     public void testRemoveWhenMultipleChildWatchesOnAPath(boolean useAsync) throws Exception {
782         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
783         final CountDownLatch childWatchCount = new CountDownLatch(1);
784         final CountDownLatch rmWatchCount = new CountDownLatch(1);
785         Watcher w1 = event -> {
786             if (event.getType() == EventType.ChildWatchRemoved) {
787                 rmWatchCount.countDown();
788             }
789         };
790         Watcher w2 = event -> {
791             if (event.getType() == EventType.NodeChildrenChanged) {
792                 childWatchCount.countDown();
793             }
794         };
795         // Add multiple child watches
796         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
797         assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set child watches");
798         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
799         assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches");
800 
801         removeWatches(zk2, "/node1", w1, WatcherType.Children, false, Code.OK, useAsync);
802         assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove child watcher");
803 
804         zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
805         LOG.info("Waiting for child watchers to be notified");
806         assertTrue(childWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't get child watch notification!");
807     }
808 
809     /**
810      * Test verifies WatcherType.Data - removes only the configured data watcher
811      * function
812      */
813     @ParameterizedTest
814     @ValueSource(booleans = {true, false})
815     @Timeout(value = 90)
testRemoveAllDataWatchesOnAPath(boolean useAsync)816     public void testRemoveAllDataWatchesOnAPath(boolean useAsync) throws Exception {
817         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
818         final CountDownLatch dWatchCount = new CountDownLatch(2);
819         final CountDownLatch rmWatchCount = new CountDownLatch(2);
820         Watcher w1 = event -> {
821             switch (event.getType()) {
822             case DataWatchRemoved:
823                 rmWatchCount.countDown();
824                 break;
825             case NodeDataChanged:
826                 dWatchCount.countDown();
827                 break;
828             default:
829                 break;
830             }
831         };
832         Watcher w2 = event -> {
833             switch (event.getType()) {
834             case DataWatchRemoved:
835                 rmWatchCount.countDown();
836                 break;
837             case NodeDataChanged:
838                 dWatchCount.countDown();
839                 break;
840             default:
841                 break;
842             }
843         };
844         // Add multiple data watches
845         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
846         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
847         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
848         assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
849 
850         assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher");
851         removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK, useAsync);
852         assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");
853 
854         assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal");
855     }
856 
857     /**
858      * Test verifies WatcherType.Children - removes only the configured child
859      * watcher function
860      */
861     @ParameterizedTest
862     @ValueSource(booleans = {true, false})
863     @Timeout(value = 90)
testRemoveAllChildWatchesOnAPath(boolean useAsync)864     public void testRemoveAllChildWatchesOnAPath(boolean useAsync) throws Exception {
865         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
866         final CountDownLatch cWatchCount = new CountDownLatch(2);
867         final CountDownLatch rmWatchCount = new CountDownLatch(2);
868         Watcher w1 = event -> {
869             switch (event.getType()) {
870             case ChildWatchRemoved:
871                 rmWatchCount.countDown();
872                 break;
873             case NodeChildrenChanged:
874                 cWatchCount.countDown();
875                 break;
876             default:
877                 break;
878             }
879         };
880         Watcher w2 = event -> {
881             switch (event.getType()) {
882             case ChildWatchRemoved:
883                 rmWatchCount.countDown();
884                 break;
885             case NodeChildrenChanged:
886                 cWatchCount.countDown();
887                 break;
888             default:
889                 break;
890             }
891         };
892         // Add multiple child watches
893         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
894         assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set child watches");
895         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
896         assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches");
897 
898         assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher");
899         removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK, useAsync);
900         assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove child watcher");
901 
902         assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal");
903     }
904 
905     /**
906      * Test verifies WatcherType.Any - removes all the configured child,data
907      * watcher functions
908      */
909     @ParameterizedTest
910     @ValueSource(booleans = {true, false})
911     @Timeout(value = 90)
testRemoveAllWatchesOnAPath(boolean useAsync)912     public void testRemoveAllWatchesOnAPath(boolean useAsync) throws Exception {
913         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
914         final CountDownLatch watchCount = new CountDownLatch(2);
915         final CountDownLatch rmWatchCount = new CountDownLatch(4);
916         Watcher w1 = event -> {
917             switch (event.getType()) {
918             case ChildWatchRemoved:
919             case DataWatchRemoved:
920                 rmWatchCount.countDown();
921                 break;
922             case NodeChildrenChanged:
923             case NodeDataChanged:
924                 watchCount.countDown();
925                 break;
926             default:
927                 break;
928             }
929         };
930         Watcher w2 = event -> {
931             switch (event.getType()) {
932             case ChildWatchRemoved:
933             case DataWatchRemoved:
934                 rmWatchCount.countDown();
935                 break;
936             case NodeChildrenChanged:
937             case NodeDataChanged:
938                 watchCount.countDown();
939                 break;
940             default:
941                 break;
942             }
943         };
944         // Add multiple child watches
945         LOG.info("Adding child watcher {} on path {}", w1, "/node1");
946         assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set child watches");
947         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
948         assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches");
949 
950         // Add multiple data watches
951         LOG.info("Adding data watcher {} on path {}", w1, "/node1");
952         assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
953         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
954         assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
955 
956         assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher");
957         removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK, useAsync);
958         assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");
959         assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal");
960         assertEquals(2, watchCount.getCount(), "Received watch notification after removal!");
961     }
962 
963     private static class MyWatchManager extends ZKWatchManager {
964 
965         int lastReturnCode;
966 
MyWatchManager(boolean disableAutoWatchReset, Watcher defaultWatcher)967         MyWatchManager(boolean disableAutoWatchReset, Watcher defaultWatcher) {
968             super(disableAutoWatchReset, defaultWatcher);
969         }
970 
containsWatcher(String path, Watcher watcher, WatcherType watcherType)971         void containsWatcher(String path, Watcher watcher, WatcherType watcherType) {
972             // prevent contains watcher
973         }
974 
975         @Override
removeWatches( Map<String, Set<Watcher>> pathVsWatcher, Watcher watcher, String path, boolean local, int rc, Set<Watcher> removedWatchers)976         protected boolean removeWatches(
977             Map<String, Set<Watcher>> pathVsWatcher,
978             Watcher watcher,
979             String path,
980             boolean local,
981             int rc,
982             Set<Watcher> removedWatchers) {
983             lastReturnCode = rc;
984             return false;
985         }
986     }
987 
988     private static class MyWatcher implements Watcher {
989 
990         private final String path;
991         private String eventPath;
992         private CountDownLatch latch;
993         private List<EventType> eventsAfterWatchRemoval = new ArrayList<EventType>();
MyWatcher(String path, int count)994         MyWatcher(String path, int count) {
995             this.path = path;
996             latch = new CountDownLatch(count);
997         }
998 
process(WatchedEvent event)999         public void process(WatchedEvent event) {
1000             LOG.debug("Event path : {}, eventPath : {}", path, event.getPath());
1001             this.eventPath = event.getPath();
1002             // notifies watcher removal
1003             if (latch.getCount() == 0) {
1004                 if (event.getType() != EventType.None) {
1005                     eventsAfterWatchRemoval.add(event.getType());
1006                 }
1007             }
1008             if (event.getType() == EventType.ChildWatchRemoved || event.getType() == EventType.DataWatchRemoved) {
1009                 latch.countDown();
1010             }
1011         }
1012 
1013         /**
1014          * Returns true if the watcher was triggered.  Try to avoid using this
1015          * method with assertFalse statements.  A false return depends on a timed
1016          * out wait on a latch, which makes tests run long.
1017          *
1018          * @return true if the watcher was triggered, false otherwise
1019          * @throws InterruptedException if interrupted while waiting on latch
1020          */
matches()1021         public boolean matches() throws InterruptedException {
1022             if (!latch.await(CONNECTION_TIMEOUT / 5, TimeUnit.MILLISECONDS)) {
1023                 LOG.error("Failed waiting to remove the watches");
1024                 return false;
1025             }
1026             LOG.debug("Client path : {} eventPath : {}", path, eventPath);
1027             return path.equals(eventPath);
1028         }
1029 
getEventsAfterWatchRemoval()1030         public List<EventType> getEventsAfterWatchRemoval() {
1031             return eventsAfterWatchRemoval;
1032         }
1033 
1034     }
1035 
1036     private class MyCallback implements AsyncCallback.VoidCallback {
1037 
1038         private final String path;
1039         private final int rc;
1040         private String eventPath;
1041         int eventRc;
1042         private CountDownLatch latch = new CountDownLatch(1);
1043 
MyCallback(int rc, String path)1044         public MyCallback(int rc, String path) {
1045             this.rc = rc;
1046             this.path = path;
1047         }
1048 
1049         @Override
processResult(int rc, String eventPath, Object ctx)1050         public void processResult(int rc, String eventPath, Object ctx) {
1051             System.out.println("latch:" + path + " " + eventPath);
1052             this.eventPath = eventPath;
1053             this.eventRc = rc;
1054             this.latch.countDown();
1055         }
1056 
1057         /**
1058          * Returns true if the callback was triggered.  Try to avoid using this
1059          * method with assertFalse statements.  A false return depends on a timed
1060          * out wait on a latch, which makes tests run long.
1061          *
1062          * @return true if the watcher was triggered, false otherwise
1063          * @throws InterruptedException if interrupted while waiting on latch
1064          */
matches()1065         public boolean matches() throws InterruptedException {
1066             if (!latch.await(CONNECTION_TIMEOUT / 5, TimeUnit.MILLISECONDS)) {
1067                 return false;
1068             }
1069             return path.equals(eventPath) && rc == eventRc;
1070         }
1071 
1072     }
1073 
1074     /**
1075      * Checks if a session is registered with the server as a watcher.
1076      *
1077      * @param sessionId the session ID to check
1078      * @param path the path to check for watchers
1079      * @param type the type of watcher
1080      * @return true if the client session is a watcher on path for the type
1081      */
isServerSessionWatcher(long sessionId, String path, WatcherType type)1082     private boolean isServerSessionWatcher(long sessionId, String path, WatcherType type) {
1083         Set<ServerCnxn> cnxns = new HashSet<>();
1084         CollectionUtils.addAll(cnxns, serverFactory.getConnections().iterator());
1085         for (ServerCnxn cnxn : cnxns) {
1086             if (cnxn.getSessionId() == sessionId) {
1087                 return serverFactory.getZooKeeperServer().getZKDatabase().getDataTree().containsWatcher(path, type, cnxn);
1088             }
1089         }
1090         return false;
1091     }
1092 
1093 }
1094