1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.zookeeper; 20 21 import static org.hamcrest.CoreMatchers.is; 22 import static org.hamcrest.MatcherAssert.assertThat; 23 import static org.junit.jupiter.api.Assertions.assertEquals; 24 import static org.junit.jupiter.api.Assertions.assertFalse; 25 import static org.junit.jupiter.api.Assertions.assertNotNull; 26 import static org.junit.jupiter.api.Assertions.assertNull; 27 import static org.junit.jupiter.api.Assertions.assertTrue; 28 import static org.junit.jupiter.api.Assertions.fail; 29 import static org.mockito.Mockito.doReturn; 30 import static org.mockito.Mockito.spy; 31 import java.io.IOException; 32 import java.util.ArrayList; 33 import java.util.HashSet; 34 import java.util.List; 35 import java.util.Map; 36 import java.util.Set; 37 import java.util.concurrent.CountDownLatch; 38 import java.util.concurrent.TimeUnit; 39 import java.util.concurrent.TimeoutException; 40 import org.apache.commons.collections4.CollectionUtils; 41 import org.apache.zookeeper.KeeperException.Code; 42 import org.apache.zookeeper.Watcher.Event.EventType; 43 import org.apache.zookeeper.Watcher.WatcherType; 44 import org.apache.zookeeper.ZooDefs.Ids; 45 import org.apache.zookeeper.server.ServerCnxn; 46 import org.apache.zookeeper.test.ClientBase; 47 import org.junit.jupiter.api.AfterEach; 48 import org.junit.jupiter.api.BeforeEach; 49 import org.junit.jupiter.api.Timeout; 50 import org.junit.jupiter.params.ParameterizedTest; 51 import org.junit.jupiter.params.provider.ValueSource; 52 import org.slf4j.Logger; 53 import org.slf4j.LoggerFactory; 54 55 /** 56 * Verifies removing watches using ZooKeeper client apis 57 */ 58 public class RemoveWatchesTest extends ClientBase { 59 60 private static final Logger LOG = LoggerFactory.getLogger(RemoveWatchesTest.class); 61 private ZooKeeper zk1 = null; 62 private ZooKeeper zk2 = null; 63 64 @BeforeEach 65 @Override setUp()66 public void setUp() throws Exception { 67 super.setUp(); 68 zk1 = createClient(); 69 zk2 = createClient(); 70 } 71 72 @AfterEach 73 @Override tearDown()74 public void tearDown() throws Exception { 75 if (zk1 != null) { 76 zk1.close(); 77 } 78 if (zk2 != null) { 79 zk2.close(); 80 } 81 super.tearDown(); 82 } 83 removeWatches( ZooKeeper zk, String path, Watcher watcher, WatcherType watcherType, boolean local, KeeperException.Code rc, boolean useAsync)84 private void removeWatches( 85 ZooKeeper zk, 86 String path, 87 Watcher watcher, 88 WatcherType watcherType, 89 boolean local, 90 KeeperException.Code rc, 91 boolean useAsync) throws InterruptedException, KeeperException { 92 LOG.info("Sending removeWatches req using zk {} path: {} type: {} watcher: {} ", zk, path, watcherType, watcher); 93 if (useAsync) { 94 MyCallback c1 = new MyCallback(rc.intValue(), path); 95 zk.removeWatches(path, watcher, watcherType, local, c1, null); 96 assertTrue(c1.matches(), "Didn't succeeds removeWatch operation"); 97 if (KeeperException.Code.OK.intValue() != c1.rc) { 98 KeeperException ke = KeeperException.create(KeeperException.Code.get(c1.rc)); 99 throw ke; 100 } 101 } else { 102 zk.removeWatches(path, watcher, watcherType, local); 103 } 104 } 105 removeAllWatches( ZooKeeper zk, String path, WatcherType watcherType, boolean local, KeeperException.Code rc, boolean useAsync)106 private void removeAllWatches( 107 ZooKeeper zk, 108 String path, 109 WatcherType watcherType, 110 boolean local, 111 KeeperException.Code rc, 112 boolean useAsync) throws InterruptedException, KeeperException { 113 LOG.info("Sending removeWatches req using zk {} path: {} type: {} ", zk, path, watcherType); 114 if (useAsync) { 115 MyCallback c1 = new MyCallback(rc.intValue(), path); 116 zk.removeAllWatches(path, watcherType, local, c1, null); 117 assertTrue(c1.matches(), "Didn't succeeds removeWatch operation"); 118 if (KeeperException.Code.OK.intValue() != c1.rc) { 119 KeeperException ke = KeeperException.create(KeeperException.Code.get(c1.rc)); 120 throw ke; 121 } 122 } else { 123 zk.removeAllWatches(path, watcherType, local); 124 } 125 } 126 127 /** 128 * Test verifies removal of single watcher when there is server connection 129 */ 130 @ParameterizedTest 131 @ValueSource(booleans = {true, false}) 132 @Timeout(value = 90) testRemoveSingleWatcher(boolean useAsync)133 public void testRemoveSingleWatcher(boolean useAsync) throws Exception { 134 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 135 zk1.create("/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 136 MyWatcher w1 = new MyWatcher("/node1", 1); 137 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 138 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 139 MyWatcher w2 = new MyWatcher("/node2", 1); 140 LOG.info("Adding data watcher {} on path {}", w2, "/node1"); 141 assertNotNull(zk2.exists("/node2", w2), "Didn't set data watches"); 142 removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK, useAsync); 143 assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher"); 144 assertEquals("/node2", zk2.getDataWatches().get(0), "Didn't find data watcher"); 145 removeWatches(zk2, "/node2", w2, WatcherType.Any, false, Code.OK, useAsync); 146 assertTrue(w2.matches(), "Didn't remove data watcher"); 147 // closing session should remove ephemeral nodes and trigger data 148 // watches if any 149 if (zk1 != null) { 150 zk1.close(); 151 zk1 = null; 152 } 153 154 List<EventType> events = w1.getEventsAfterWatchRemoval(); 155 assertFalse(events.contains(EventType.NodeDeleted), "Shouldn't get NodeDeletedEvent after watch removal"); 156 assertEquals(0, events.size(), "Shouldn't get NodeDeletedEvent after watch removal"); 157 } 158 159 /** 160 * Test verifies removal of multiple data watchers when there is server 161 * connection 162 */ 163 @ParameterizedTest 164 @ValueSource(booleans = {true, false}) 165 @Timeout(value = 90) testMultipleDataWatchers(boolean useAsync)166 public void testMultipleDataWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException { 167 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 168 MyWatcher w1 = new MyWatcher("/node1", 1); 169 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 170 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 171 MyWatcher w2 = new MyWatcher("/node1", 1); 172 LOG.info("Adding data watcher {} on path {}", w2, "/node1"); 173 assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); 174 removeWatches(zk2, "/node1", w2, WatcherType.Data, false, Code.OK, useAsync); 175 assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher"); 176 assertEquals("/node1", zk2.getDataWatches().get(0), "Didn't find data watcher"); 177 removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync); 178 assertTrue(w2.matches(), "Didn't remove data watcher"); 179 // closing session should remove ephemeral nodes and trigger data 180 // watches if any 181 if (zk1 != null) { 182 zk1.close(); 183 zk1 = null; 184 } 185 186 List<EventType> events = w2.getEventsAfterWatchRemoval(); 187 assertEquals(0, events.size(), "Shouldn't get NodeDeletedEvent after watch removal"); 188 } 189 190 /** 191 * Test verifies removal of multiple child watchers when there is server 192 * connection 193 */ 194 @ParameterizedTest 195 @ValueSource(booleans = {true, false}) 196 @Timeout(value = 90) testMultipleChildWatchers(boolean useAsync)197 public void testMultipleChildWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException { 198 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 199 MyWatcher w1 = new MyWatcher("/node1", 1); 200 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 201 zk2.getChildren("/node1", w1); 202 MyWatcher w2 = new MyWatcher("/node1", 1); 203 LOG.info("Adding child watcher {} on path {}", w2, "/node1"); 204 zk2.getChildren("/node1", w2); 205 removeWatches(zk2, "/node1", w2, WatcherType.Children, false, Code.OK, useAsync); 206 assertTrue(w2.matches(), "Didn't remove child watcher"); 207 assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher"); 208 removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync); 209 assertTrue(w1.matches(), "Didn't remove child watcher"); 210 // create child to see NodeChildren notification 211 zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 212 // waiting for child watchers to be notified 213 int count = 30; 214 while (count > 0) { 215 if (w1.getEventsAfterWatchRemoval().size() > 0) { 216 break; 217 } 218 count--; 219 Thread.sleep(100); 220 } 221 // watcher2 222 List<EventType> events = w2.getEventsAfterWatchRemoval(); 223 assertEquals(0, events.size(), "Shouldn't get NodeChildrenChanged event"); 224 } 225 226 /** 227 * Test verifies null watcher with WatcherType.Any - remove all the watchers 228 * data, child, exists 229 */ 230 @ParameterizedTest 231 @ValueSource(booleans = {true, false}) 232 @Timeout(value = 90) testRemoveAllWatchers(boolean useAsync)233 public void testRemoveAllWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException { 234 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 235 MyWatcher w1 = new MyWatcher("/node1", 2); 236 MyWatcher w2 = new MyWatcher("/node1", 2); 237 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 238 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 239 LOG.info("Adding data watcher {} on path {}", w2, "/node1"); 240 assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); 241 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 242 zk2.getChildren("/node1", w1); 243 LOG.info("Adding child watcher {} on path {}", w2, "/node1"); 244 zk2.getChildren("/node1", w2); 245 removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync); 246 removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync); 247 zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 248 assertTrue(w1.matches(), "Didn't remove data watcher"); 249 assertTrue(w2.matches(), "Didn't remove child watcher"); 250 } 251 252 /** 253 * Test verifies null watcher with WatcherType.Data - remove all data 254 * watchers. Child watchers shouldn't be removed 255 */ 256 @ParameterizedTest 257 @ValueSource(booleans = {true, false}) 258 @Timeout(value = 90) testRemoveAllDataWatchers(boolean useAsync)259 public void testRemoveAllDataWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException { 260 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 261 MyWatcher w1 = new MyWatcher("/node1", 1); 262 MyWatcher w2 = new MyWatcher("/node1", 1); 263 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 264 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 265 LOG.info("Adding data watcher {} on path {}", w2, "/node1"); 266 assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); 267 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 268 zk2.getChildren("/node1", w1); 269 LOG.info("Adding child watcher {} on path {}", w2, "/node1"); 270 zk2.getChildren("/node1", w2); 271 removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK, useAsync); 272 removeWatches(zk2, "/node1", w2, WatcherType.Data, false, Code.OK, useAsync); 273 zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 274 assertTrue(w1.matches(), "Didn't remove data watcher"); 275 assertTrue(w2.matches(), "Didn't remove data watcher"); 276 // waiting for child watchers to be notified 277 int count = 10; 278 while (count > 0) { 279 if (w1.getEventsAfterWatchRemoval().size() > 0 && w2.getEventsAfterWatchRemoval().size() > 0) { 280 break; 281 } 282 count--; 283 Thread.sleep(1000); 284 } 285 // watcher1 286 List<EventType> events = w1.getEventsAfterWatchRemoval(); 287 assertEquals(1, events.size(), "Didn't get NodeChildrenChanged event"); 288 assertTrue(events.contains(EventType.NodeChildrenChanged), "Didn't get NodeChildrenChanged event"); 289 // watcher2 290 events = w2.getEventsAfterWatchRemoval(); 291 assertEquals(1, events.size(), "Didn't get NodeChildrenChanged event"); 292 assertTrue(events.contains(EventType.NodeChildrenChanged), "Didn't get NodeChildrenChanged event"); 293 } 294 295 /** 296 * Test verifies null watcher with WatcherType.Children - remove all child 297 * watchers. Data watchers shouldn't be removed 298 */ 299 @ParameterizedTest 300 @ValueSource(booleans = {true, false}) 301 @Timeout(value = 90) testRemoveAllChildWatchers(boolean useAsync)302 public void testRemoveAllChildWatchers(boolean useAsync) throws IOException, InterruptedException, KeeperException { 303 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 304 MyWatcher w1 = new MyWatcher("/node1", 1); 305 MyWatcher w2 = new MyWatcher("/node1", 1); 306 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 307 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 308 LOG.info("Adding data watcher {} on path {}", w2, "/node1"); 309 assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); 310 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 311 zk2.getChildren("/node1", w1); 312 LOG.info("Adding child watcher {} on path {}", w2, "/node1"); 313 zk2.getChildren("/node1", w2); 314 removeWatches(zk2, "/node1", w1, WatcherType.Children, false, Code.OK, useAsync); 315 removeWatches(zk2, "/node1", w2, WatcherType.Children, false, Code.OK, useAsync); 316 zk1.setData("/node1", "test".getBytes(), -1); 317 assertTrue(w1.matches(), "Didn't remove child watcher"); 318 assertTrue(w2.matches(), "Didn't remove child watcher"); 319 // waiting for child watchers to be notified 320 int count = 10; 321 while (count > 0) { 322 if (w1.getEventsAfterWatchRemoval().size() > 0 && w2.getEventsAfterWatchRemoval().size() > 0) { 323 break; 324 } 325 count--; 326 Thread.sleep(1000); 327 } 328 // watcher1 329 List<EventType> events = w1.getEventsAfterWatchRemoval(); 330 assertEquals(1, events.size(), "Didn't get NodeDataChanged event"); 331 assertTrue(events.contains(EventType.NodeDataChanged), "Didn't get NodeDataChanged event"); 332 // watcher2 333 events = w2.getEventsAfterWatchRemoval(); 334 assertEquals(1, events.size(), "Didn't get NodeDataChanged event"); 335 assertTrue(events.contains(EventType.NodeDataChanged), "Didn't get NodeDataChanged event"); 336 } 337 338 /** 339 * Test verifies given watcher doesn't exists! 340 */ 341 @ParameterizedTest 342 @ValueSource(booleans = {true, false}) 343 @Timeout(value = 90) testNoWatcherException(boolean useAsync)344 public void testNoWatcherException(boolean useAsync) throws IOException, InterruptedException, KeeperException { 345 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 346 MyWatcher w1 = new MyWatcher("/node1", 2); 347 MyWatcher w2 = new MyWatcher("/node1", 2); 348 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 349 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 350 LOG.info("Adding data watcher {} on path {}", w2, "/node1"); 351 assertNull(zk2.exists("/node2", w2), "Didn't set data watches"); 352 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 353 zk2.getChildren("/node1", w1); 354 LOG.info("Adding child watcher {} on path {}", w2, "/node1"); 355 zk2.getChildren("/node1", w2); 356 357 // New Watcher which will be used for removal 358 MyWatcher w3 = new MyWatcher("/node1", 2); 359 360 try { 361 removeWatches(zk2, "/node1", w3, WatcherType.Any, false, Code.NOWATCHER, useAsync); 362 fail("Should throw exception as given watcher doesn't exists"); 363 } catch (KeeperException.NoWatcherException nwe) { 364 // expected 365 } 366 try { 367 removeWatches(zk2, "/node1", w3, WatcherType.Children, false, Code.NOWATCHER, useAsync); 368 fail("Should throw exception as given watcher doesn't exists"); 369 } catch (KeeperException.NoWatcherException nwe) { 370 // expected 371 } 372 try { 373 removeWatches(zk2, "/node1", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync); 374 fail("Should throw exception as given watcher doesn't exists"); 375 } catch (KeeperException.NoWatcherException nwe) { 376 // expected 377 } 378 try { 379 removeWatches(zk2, "/nonexists", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync); 380 fail("Should throw exception as given watcher doesn't exists"); 381 } catch (KeeperException.NoWatcherException nwe) { 382 // expected 383 } 384 } 385 386 /** 387 * Test verifies WatcherType.Any - removes only the configured data watcher 388 * function 389 */ 390 @ParameterizedTest 391 @ValueSource(booleans = {true, false}) 392 @Timeout(value = 90) testRemoveAnyDataWatcher(boolean useAsync)393 public void testRemoveAnyDataWatcher(boolean useAsync) throws Exception { 394 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 395 MyWatcher w1 = new MyWatcher("/node1", 1); 396 MyWatcher w2 = new MyWatcher("/node1", 2); 397 // Add multiple data watches 398 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 399 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 400 LOG.info("Adding data watcher {} on path {}", w2, "/node1"); 401 assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); 402 // Add child watch 403 LOG.info("Adding child watcher {} on path {}", w2, "/node1"); 404 zk2.getChildren("/node1", w2); 405 removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync); 406 assertTrue(w1.matches(), "Didn't remove data watcher"); 407 assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher"); 408 assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher"); 409 removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync); 410 assertTrue(w2.matches(), "Didn't remove child watcher"); 411 } 412 413 /** 414 * Test verifies WatcherType.Any - removes only the configured child watcher 415 * function 416 */ 417 @ParameterizedTest 418 @ValueSource(booleans = {true, false}) 419 @Timeout(value = 90) testRemoveAnyChildWatcher(boolean useAsync)420 public void testRemoveAnyChildWatcher(boolean useAsync) throws Exception { 421 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 422 MyWatcher w1 = new MyWatcher("/node1", 2); 423 MyWatcher w2 = new MyWatcher("/node1", 1); 424 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 425 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 426 // Add multiple child watches 427 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 428 zk2.getChildren("/node1", w2); 429 LOG.info("Adding child watcher {} on path {}", w2, "/node1"); 430 zk2.getChildren("/node1", w1); 431 removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync); 432 assertTrue(w2.matches(), "Didn't remove child watcher"); 433 assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher"); 434 assertEquals(1, zk2.getDataWatches().size(), "Didn't find data watcher"); 435 removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync); 436 assertTrue(w1.matches(), "Didn't remove watchers"); 437 } 438 439 /** 440 * Test verifies when there is no server connection. Remove watches when 441 * local=true, otw should retain it 442 */ 443 @ParameterizedTest 444 @ValueSource(booleans = {true, false}) 445 @Timeout(value = 90) testRemoveWatcherWhenNoConnection(boolean useAsync)446 public void testRemoveWatcherWhenNoConnection(boolean useAsync) throws Exception { 447 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 448 MyWatcher w1 = new MyWatcher("/node1", 2); 449 MyWatcher w2 = new MyWatcher("/node1", 1); 450 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 451 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 452 // Add multiple child watches 453 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 454 zk2.getChildren("/node1", w1); 455 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 456 zk2.getChildren("/node1", w2); 457 stopServer(); 458 removeWatches(zk2, "/node1", w2, WatcherType.Any, true, Code.OK, useAsync); 459 assertTrue(w2.matches(), "Didn't remove child watcher"); 460 assertFalse(w1.matches(), "Shouldn't remove data watcher"); 461 try { 462 removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.CONNECTIONLOSS, useAsync); 463 fail("Should throw exception as last watch removal requires server connection"); 464 } catch (KeeperException.ConnectionLossException nwe) { 465 // expected 466 } 467 assertFalse(w1.matches(), "Shouldn't remove data watcher"); 468 469 // when local=true, here if connection not available, simply removes 470 // from local session 471 removeWatches(zk2, "/node1", w1, WatcherType.Any, true, Code.OK, useAsync); 472 assertTrue(w1.matches(), "Didn't remove data watcher"); 473 } 474 475 /** 476 * Test verifies many pre-node watchers. Also, verifies internal 477 * datastructure 'watchManager.existWatches' 478 */ 479 @ParameterizedTest 480 @ValueSource(booleans = {true, false}) 481 @Timeout(value = 90) testManyPreNodeWatchers(boolean useAsync)482 public void testManyPreNodeWatchers(boolean useAsync) throws Exception { 483 int count = 50; 484 List<MyWatcher> wList = new ArrayList<MyWatcher>(count); 485 MyWatcher w; 486 String path = "/node"; 487 // Exists watcher 488 for (int i = 0; i < count; i++) { 489 final String nodePath = path + i; 490 w = new MyWatcher(nodePath, 1); 491 wList.add(w); 492 LOG.info("Adding pre node watcher {} on path {}", w, nodePath); 493 zk1.exists(nodePath, w); 494 } 495 assertEquals(count, zk1.getExistWatches().size(), "Failed to add watchers!"); 496 for (int i = 0; i < count; i++) { 497 final MyWatcher watcher = wList.get(i); 498 removeWatches(zk1, path + i, watcher, WatcherType.Data, false, Code.OK, useAsync); 499 assertTrue(watcher.matches(), "Didn't remove data watcher"); 500 } 501 assertEquals(0, zk1.getExistWatches().size(), "Didn't remove watch references!"); 502 } 503 504 /** 505 * Test verifies many child watchers. Also, verifies internal datastructure 506 * 'watchManager.childWatches' 507 */ 508 @ParameterizedTest 509 @ValueSource(booleans = {true, false}) 510 @Timeout(value = 90) testManyChildWatchers(boolean useAsync)511 public void testManyChildWatchers(boolean useAsync) throws Exception { 512 int count = 50; 513 List<MyWatcher> wList = new ArrayList<MyWatcher>(count); 514 MyWatcher w; 515 String path = "/node"; 516 517 // Child watcher 518 for (int i = 0; i < count; i++) { 519 String nodePath = path + i; 520 zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 521 nodePath += "/"; 522 } 523 for (int i = 0; i < count; i++) { 524 String nodePath = path + i; 525 w = new MyWatcher(path + i, 1); 526 wList.add(w); 527 LOG.info("Adding child watcher {} on path {}", w, nodePath); 528 zk1.getChildren(nodePath, w); 529 nodePath += "/"; 530 } 531 assertEquals(count, zk1.getChildWatches().size(), "Failed to add watchers!"); 532 for (int i = 0; i < count; i++) { 533 final MyWatcher watcher = wList.get(i); 534 removeWatches(zk1, path + i, watcher, WatcherType.Children, false, Code.OK, useAsync); 535 assertTrue(watcher.matches(), "Didn't remove child watcher"); 536 } 537 assertEquals(0, zk1.getChildWatches().size(), "Didn't remove watch references!"); 538 } 539 540 /** 541 * Test verifies many data watchers. Also, verifies internal datastructure 542 * 'watchManager.dataWatches' 543 */ 544 @ParameterizedTest 545 @ValueSource(booleans = {true, false}) 546 @Timeout(value = 90) testManyDataWatchers(boolean useAsync)547 public void testManyDataWatchers(boolean useAsync) throws Exception { 548 int count = 50; 549 List<MyWatcher> wList = new ArrayList<MyWatcher>(count); 550 MyWatcher w; 551 String path = "/node"; 552 553 // Data watcher 554 for (int i = 0; i < count; i++) { 555 String nodePath = path + i; 556 w = new MyWatcher(path + i, 1); 557 wList.add(w); 558 zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 559 LOG.info("Adding data watcher {} on path {}", w, nodePath); 560 zk1.getData(nodePath, w, null); 561 nodePath += "/"; 562 } 563 assertEquals(count, zk1.getDataWatches().size(), "Failed to add watchers!"); 564 for (int i = 0; i < count; i++) { 565 final MyWatcher watcher = wList.get(i); 566 removeWatches(zk1, path + i, watcher, WatcherType.Data, false, Code.OK, useAsync); 567 assertTrue(watcher.matches(), "Didn't remove data watcher"); 568 } 569 assertEquals(0, zk1.getDataWatches().size(), "Didn't remove watch references!"); 570 } 571 572 /** 573 * Test verifies removal of many watchers locally when no connection and 574 * WatcherType#Any. Also, verifies internal watchManager datastructures 575 */ 576 @ParameterizedTest 577 @ValueSource(booleans = {true, false}) 578 @Timeout(value = 90) testManyWatchersWhenNoConnection(boolean useAsync)579 public void testManyWatchersWhenNoConnection(boolean useAsync) throws Exception { 580 int count = 3; 581 List<MyWatcher> wList = new ArrayList<MyWatcher>(count); 582 MyWatcher w; 583 String path = "/node"; 584 585 // Child watcher 586 for (int i = 0; i < count; i++) { 587 String nodePath = path + i; 588 zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 589 nodePath += "/"; 590 } 591 for (int i = 0; i < count; i++) { 592 String nodePath = path + i; 593 w = new MyWatcher(path + i, 2); 594 wList.add(w); 595 LOG.info("Adding child watcher {} on path {}", w, nodePath); 596 zk1.getChildren(nodePath, w); 597 nodePath += "/"; 598 } 599 assertEquals(count, zk1.getChildWatches().size(), "Failed to add watchers!"); 600 601 // Data watcher 602 for (int i = 0; i < count; i++) { 603 String nodePath = path + i; 604 w = wList.get(i); 605 LOG.info("Adding data watcher {} on path {}", w, nodePath); 606 zk1.getData(nodePath, w, null); 607 nodePath += "/"; 608 } 609 assertEquals(count, zk1.getDataWatches().size(), "Failed to add watchers!"); 610 stopServer(); 611 for (int i = 0; i < count; i++) { 612 final MyWatcher watcher = wList.get(i); 613 removeWatches(zk1, path + i, watcher, WatcherType.Any, true, Code.OK, useAsync); 614 assertTrue(watcher.matches(), "Didn't remove watcher"); 615 } 616 assertEquals(0, zk1.getChildWatches().size(), "Didn't remove watch references!"); 617 assertEquals(0, zk1.getDataWatches().size(), "Didn't remove watch references!"); 618 } 619 620 /** 621 * Test verifies removing watcher having namespace 622 */ 623 @ParameterizedTest 624 @ValueSource(booleans = {true, false}) 625 @Timeout(value = 90) testChRootRemoveWatcher(boolean useAsync)626 public void testChRootRemoveWatcher(boolean useAsync) throws Exception { 627 // creating the subtree for chRoot clients. 628 String chRoot = "/appsX"; 629 zk1.create("/appsX", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 630 if (zk1 != null) { 631 zk1.close(); 632 } 633 if (zk2 != null) { 634 zk2.close(); 635 } 636 // Creating chRoot client. 637 zk1 = createClient(this.hostPort + chRoot); 638 zk2 = createClient(this.hostPort + chRoot); 639 640 LOG.info("Creating child znode /node1 using chRoot client"); 641 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 642 MyWatcher w1 = new MyWatcher("/node1", 2); 643 MyWatcher w2 = new MyWatcher("/node1", 1); 644 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 645 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 646 // Add multiple child watches 647 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 648 zk2.getChildren("/node1", w2); 649 LOG.info("Adding child watcher {} on path {}", w2, "/node1"); 650 zk2.getChildren("/node1", w1); 651 removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK, useAsync); 652 assertTrue(w1.matches(), "Didn't remove child watcher"); 653 assertEquals(1, zk2.getChildWatches().size(), "Didn't find child watcher"); 654 removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK, useAsync); 655 assertTrue(w2.matches(), "Didn't remove child watcher"); 656 } 657 658 /** 659 * Verify that if a given watcher doesn't exist, the server properly 660 * returns an error code for it. 661 * 662 * In our Java client implementation, we check that a given watch exists at 663 * two points: 664 * 665 * 1) before submitting the RemoveWatches request 666 * 2) after a successful server response, when the watcher needs to be 667 * removed 668 * 669 * Since this can be racy (i.e. a watch can fire while a RemoveWatches 670 * request is in-flight), we need to verify that the watch was actually 671 * removed (i.e. from ZKDatabase and DataTree) and return NOWATCHER if 672 * needed. 673 * 674 * Also, other implementations might not do a client side check before 675 * submitting a RemoveWatches request. If we don't do a server side check, 676 * we would just return ZOK even if no watch was removed. 677 * 678 */ 679 @ParameterizedTest 680 @ValueSource(booleans = {true, false}) 681 @Timeout(value = 90) testNoWatcherServerException(boolean useAsync)682 public void testNoWatcherServerException(boolean useAsync) throws InterruptedException, IOException, TimeoutException { 683 CountdownWatcher watcher = new CountdownWatcher(); 684 ZooKeeper zk = spy(new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher)); 685 MyWatchManager watchManager = new MyWatchManager(false, watcher); 686 doReturn(watchManager).when(zk).getWatchManager(); 687 boolean nw = false; 688 689 watcher.waitForConnected(CONNECTION_TIMEOUT); 690 691 try { 692 zk.removeWatches("/nowatchhere", watcher, WatcherType.Data, false); 693 } catch (KeeperException nwe) { 694 if (nwe.code().intValue() == Code.NOWATCHER.intValue()) { 695 nw = true; 696 } 697 } 698 699 assertThat("Server didn't return NOWATCHER", watchManager.lastReturnCode, is(Code.NOWATCHER.intValue())); 700 assertThat("NoWatcherException didn't happen", nw, is(true)); 701 } 702 703 /** 704 * Test verifies given watcher doesn't exists! 705 */ 706 @ParameterizedTest 707 @ValueSource(booleans = {true, false}) 708 @Timeout(value = 90) testRemoveAllNoWatcherException(boolean useAsync)709 public void testRemoveAllNoWatcherException(boolean useAsync) throws IOException, InterruptedException, KeeperException { 710 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 711 try { 712 removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.NOWATCHER, useAsync); 713 fail("Should throw exception as given watcher doesn't exists"); 714 } catch (KeeperException.NoWatcherException nwe) { 715 // expected 716 } 717 } 718 719 /** 720 * Test verifies null watcher 721 */ 722 @ParameterizedTest 723 @ValueSource(booleans = {true, false}) 724 @Timeout(value = 30) testNullWatcherReference(boolean useAsync)725 public void testNullWatcherReference(boolean useAsync) throws Exception { 726 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 727 try { 728 if (useAsync) { 729 zk1.removeWatches("/node1", null, WatcherType.Data, false, null, null); 730 } else { 731 zk1.removeWatches("/node1", null, WatcherType.Data, false); 732 } 733 fail("Must throw IllegalArgumentException as watcher is null!"); 734 } catch (IllegalArgumentException iae) { 735 // expected 736 } 737 } 738 739 /** 740 * Test verifies WatcherType.Data - removes only the configured data watcher 741 * function 742 */ 743 @ParameterizedTest 744 @ValueSource(booleans = {true, false}) 745 @Timeout(value = 90) testRemoveWhenMultipleDataWatchesOnAPath(boolean useAsync)746 public void testRemoveWhenMultipleDataWatchesOnAPath(boolean useAsync) throws Exception { 747 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 748 final CountDownLatch dataWatchCount = new CountDownLatch(1); 749 final CountDownLatch rmWatchCount = new CountDownLatch(1); 750 Watcher w1 = event -> { 751 if (event.getType() == EventType.DataWatchRemoved) { 752 rmWatchCount.countDown(); 753 } 754 }; 755 Watcher w2 = event -> { 756 if (event.getType() == EventType.NodeDataChanged) { 757 dataWatchCount.countDown(); 758 } 759 }; 760 // Add multiple data watches 761 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 762 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 763 LOG.info("Adding data watcher {} on path {}", w2, "/node1"); 764 assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); 765 766 removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK, useAsync); 767 assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher"); 768 769 zk1.setData("/node1", "test".getBytes(), -1); 770 LOG.info("Waiting for data watchers to be notified"); 771 assertTrue(dataWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't get data watch notification!"); 772 } 773 774 /** 775 * Test verifies WatcherType.Children - removes only the configured child 776 * watcher function 777 */ 778 @ParameterizedTest 779 @ValueSource(booleans = {true, false}) 780 @Timeout(value = 90) testRemoveWhenMultipleChildWatchesOnAPath(boolean useAsync)781 public void testRemoveWhenMultipleChildWatchesOnAPath(boolean useAsync) throws Exception { 782 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 783 final CountDownLatch childWatchCount = new CountDownLatch(1); 784 final CountDownLatch rmWatchCount = new CountDownLatch(1); 785 Watcher w1 = event -> { 786 if (event.getType() == EventType.ChildWatchRemoved) { 787 rmWatchCount.countDown(); 788 } 789 }; 790 Watcher w2 = event -> { 791 if (event.getType() == EventType.NodeChildrenChanged) { 792 childWatchCount.countDown(); 793 } 794 }; 795 // Add multiple child watches 796 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 797 assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set child watches"); 798 LOG.info("Adding child watcher {} on path {}", w2, "/node1"); 799 assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches"); 800 801 removeWatches(zk2, "/node1", w1, WatcherType.Children, false, Code.OK, useAsync); 802 assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove child watcher"); 803 804 zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 805 LOG.info("Waiting for child watchers to be notified"); 806 assertTrue(childWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't get child watch notification!"); 807 } 808 809 /** 810 * Test verifies WatcherType.Data - removes only the configured data watcher 811 * function 812 */ 813 @ParameterizedTest 814 @ValueSource(booleans = {true, false}) 815 @Timeout(value = 90) testRemoveAllDataWatchesOnAPath(boolean useAsync)816 public void testRemoveAllDataWatchesOnAPath(boolean useAsync) throws Exception { 817 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 818 final CountDownLatch dWatchCount = new CountDownLatch(2); 819 final CountDownLatch rmWatchCount = new CountDownLatch(2); 820 Watcher w1 = event -> { 821 switch (event.getType()) { 822 case DataWatchRemoved: 823 rmWatchCount.countDown(); 824 break; 825 case NodeDataChanged: 826 dWatchCount.countDown(); 827 break; 828 default: 829 break; 830 } 831 }; 832 Watcher w2 = event -> { 833 switch (event.getType()) { 834 case DataWatchRemoved: 835 rmWatchCount.countDown(); 836 break; 837 case NodeDataChanged: 838 dWatchCount.countDown(); 839 break; 840 default: 841 break; 842 } 843 }; 844 // Add multiple data watches 845 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 846 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 847 LOG.info("Adding data watcher {} on path {}", w2, "/node1"); 848 assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); 849 850 assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher"); 851 removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK, useAsync); 852 assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher"); 853 854 assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal"); 855 } 856 857 /** 858 * Test verifies WatcherType.Children - removes only the configured child 859 * watcher function 860 */ 861 @ParameterizedTest 862 @ValueSource(booleans = {true, false}) 863 @Timeout(value = 90) testRemoveAllChildWatchesOnAPath(boolean useAsync)864 public void testRemoveAllChildWatchesOnAPath(boolean useAsync) throws Exception { 865 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 866 final CountDownLatch cWatchCount = new CountDownLatch(2); 867 final CountDownLatch rmWatchCount = new CountDownLatch(2); 868 Watcher w1 = event -> { 869 switch (event.getType()) { 870 case ChildWatchRemoved: 871 rmWatchCount.countDown(); 872 break; 873 case NodeChildrenChanged: 874 cWatchCount.countDown(); 875 break; 876 default: 877 break; 878 } 879 }; 880 Watcher w2 = event -> { 881 switch (event.getType()) { 882 case ChildWatchRemoved: 883 rmWatchCount.countDown(); 884 break; 885 case NodeChildrenChanged: 886 cWatchCount.countDown(); 887 break; 888 default: 889 break; 890 } 891 }; 892 // Add multiple child watches 893 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 894 assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set child watches"); 895 LOG.info("Adding child watcher {} on path {}", w2, "/node1"); 896 assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches"); 897 898 assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher"); 899 removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK, useAsync); 900 assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove child watcher"); 901 902 assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal"); 903 } 904 905 /** 906 * Test verifies WatcherType.Any - removes all the configured child,data 907 * watcher functions 908 */ 909 @ParameterizedTest 910 @ValueSource(booleans = {true, false}) 911 @Timeout(value = 90) testRemoveAllWatchesOnAPath(boolean useAsync)912 public void testRemoveAllWatchesOnAPath(boolean useAsync) throws Exception { 913 zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 914 final CountDownLatch watchCount = new CountDownLatch(2); 915 final CountDownLatch rmWatchCount = new CountDownLatch(4); 916 Watcher w1 = event -> { 917 switch (event.getType()) { 918 case ChildWatchRemoved: 919 case DataWatchRemoved: 920 rmWatchCount.countDown(); 921 break; 922 case NodeChildrenChanged: 923 case NodeDataChanged: 924 watchCount.countDown(); 925 break; 926 default: 927 break; 928 } 929 }; 930 Watcher w2 = event -> { 931 switch (event.getType()) { 932 case ChildWatchRemoved: 933 case DataWatchRemoved: 934 rmWatchCount.countDown(); 935 break; 936 case NodeChildrenChanged: 937 case NodeDataChanged: 938 watchCount.countDown(); 939 break; 940 default: 941 break; 942 } 943 }; 944 // Add multiple child watches 945 LOG.info("Adding child watcher {} on path {}", w1, "/node1"); 946 assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set child watches"); 947 LOG.info("Adding child watcher {} on path {}", w2, "/node1"); 948 assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches"); 949 950 // Add multiple data watches 951 LOG.info("Adding data watcher {} on path {}", w1, "/node1"); 952 assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); 953 LOG.info("Adding data watcher {} on path {}", w2, "/node1"); 954 assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); 955 956 assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher"); 957 removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK, useAsync); 958 assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher"); 959 assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal"); 960 assertEquals(2, watchCount.getCount(), "Received watch notification after removal!"); 961 } 962 963 private static class MyWatchManager extends ZKWatchManager { 964 965 int lastReturnCode; 966 MyWatchManager(boolean disableAutoWatchReset, Watcher defaultWatcher)967 MyWatchManager(boolean disableAutoWatchReset, Watcher defaultWatcher) { 968 super(disableAutoWatchReset, defaultWatcher); 969 } 970 containsWatcher(String path, Watcher watcher, WatcherType watcherType)971 void containsWatcher(String path, Watcher watcher, WatcherType watcherType) { 972 // prevent contains watcher 973 } 974 975 @Override removeWatches( Map<String, Set<Watcher>> pathVsWatcher, Watcher watcher, String path, boolean local, int rc, Set<Watcher> removedWatchers)976 protected boolean removeWatches( 977 Map<String, Set<Watcher>> pathVsWatcher, 978 Watcher watcher, 979 String path, 980 boolean local, 981 int rc, 982 Set<Watcher> removedWatchers) { 983 lastReturnCode = rc; 984 return false; 985 } 986 } 987 988 private static class MyWatcher implements Watcher { 989 990 private final String path; 991 private String eventPath; 992 private CountDownLatch latch; 993 private List<EventType> eventsAfterWatchRemoval = new ArrayList<EventType>(); MyWatcher(String path, int count)994 MyWatcher(String path, int count) { 995 this.path = path; 996 latch = new CountDownLatch(count); 997 } 998 process(WatchedEvent event)999 public void process(WatchedEvent event) { 1000 LOG.debug("Event path : {}, eventPath : {}", path, event.getPath()); 1001 this.eventPath = event.getPath(); 1002 // notifies watcher removal 1003 if (latch.getCount() == 0) { 1004 if (event.getType() != EventType.None) { 1005 eventsAfterWatchRemoval.add(event.getType()); 1006 } 1007 } 1008 if (event.getType() == EventType.ChildWatchRemoved || event.getType() == EventType.DataWatchRemoved) { 1009 latch.countDown(); 1010 } 1011 } 1012 1013 /** 1014 * Returns true if the watcher was triggered. Try to avoid using this 1015 * method with assertFalse statements. A false return depends on a timed 1016 * out wait on a latch, which makes tests run long. 1017 * 1018 * @return true if the watcher was triggered, false otherwise 1019 * @throws InterruptedException if interrupted while waiting on latch 1020 */ matches()1021 public boolean matches() throws InterruptedException { 1022 if (!latch.await(CONNECTION_TIMEOUT / 5, TimeUnit.MILLISECONDS)) { 1023 LOG.error("Failed waiting to remove the watches"); 1024 return false; 1025 } 1026 LOG.debug("Client path : {} eventPath : {}", path, eventPath); 1027 return path.equals(eventPath); 1028 } 1029 getEventsAfterWatchRemoval()1030 public List<EventType> getEventsAfterWatchRemoval() { 1031 return eventsAfterWatchRemoval; 1032 } 1033 1034 } 1035 1036 private class MyCallback implements AsyncCallback.VoidCallback { 1037 1038 private final String path; 1039 private final int rc; 1040 private String eventPath; 1041 int eventRc; 1042 private CountDownLatch latch = new CountDownLatch(1); 1043 MyCallback(int rc, String path)1044 public MyCallback(int rc, String path) { 1045 this.rc = rc; 1046 this.path = path; 1047 } 1048 1049 @Override processResult(int rc, String eventPath, Object ctx)1050 public void processResult(int rc, String eventPath, Object ctx) { 1051 System.out.println("latch:" + path + " " + eventPath); 1052 this.eventPath = eventPath; 1053 this.eventRc = rc; 1054 this.latch.countDown(); 1055 } 1056 1057 /** 1058 * Returns true if the callback was triggered. Try to avoid using this 1059 * method with assertFalse statements. A false return depends on a timed 1060 * out wait on a latch, which makes tests run long. 1061 * 1062 * @return true if the watcher was triggered, false otherwise 1063 * @throws InterruptedException if interrupted while waiting on latch 1064 */ matches()1065 public boolean matches() throws InterruptedException { 1066 if (!latch.await(CONNECTION_TIMEOUT / 5, TimeUnit.MILLISECONDS)) { 1067 return false; 1068 } 1069 return path.equals(eventPath) && rc == eventRc; 1070 } 1071 1072 } 1073 1074 /** 1075 * Checks if a session is registered with the server as a watcher. 1076 * 1077 * @param sessionId the session ID to check 1078 * @param path the path to check for watchers 1079 * @param type the type of watcher 1080 * @return true if the client session is a watcher on path for the type 1081 */ isServerSessionWatcher(long sessionId, String path, WatcherType type)1082 private boolean isServerSessionWatcher(long sessionId, String path, WatcherType type) { 1083 Set<ServerCnxn> cnxns = new HashSet<>(); 1084 CollectionUtils.addAll(cnxns, serverFactory.getConnections().iterator()); 1085 for (ServerCnxn cnxn : cnxns) { 1086 if (cnxn.getSessionId() == sessionId) { 1087 return serverFactory.getZooKeeperServer().getZKDatabase().getDataTree().containsWatcher(path, type, cnxn); 1088 } 1089 } 1090 return false; 1091 } 1092 1093 } 1094