1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.zookeeper.server.watch; 20 21 import java.io.PrintWriter; 22 import java.util.BitSet; 23 import java.util.HashMap; 24 import java.util.HashSet; 25 import java.util.Map; 26 import java.util.Map.Entry; 27 import java.util.Set; 28 import java.util.concurrent.ConcurrentHashMap; 29 import java.util.concurrent.locks.ReentrantReadWriteLock; 30 import org.apache.zookeeper.WatchedEvent; 31 import org.apache.zookeeper.Watcher; 32 import org.apache.zookeeper.Watcher.Event.EventType; 33 import org.apache.zookeeper.Watcher.Event.KeeperState; 34 import org.apache.zookeeper.server.ServerCnxn; 35 import org.apache.zookeeper.server.ServerMetrics; 36 import org.apache.zookeeper.server.util.BitHashSet; 37 import org.apache.zookeeper.server.util.BitMap; 38 import org.slf4j.Logger; 39 import org.slf4j.LoggerFactory; 40 41 /** 42 * Optimized in memory and time complexity, compared to WatchManager, both the 43 * memory consumption and time complexity improved a lot, but it cannot 44 * efficiently remove the watcher when the session or socket is closed, for 45 * majority use case this is not a problem. 46 * 47 * Changed made compared to WatchManager: 48 * 49 * - Use HashSet and BitSet to store the watchers to find a balance between 50 * memory usage and time complexity 51 * - Use ReadWriteLock instead of synchronized to reduce lock retention 52 * - Lazily clean up the closed watchers 53 */ 54 public class WatchManagerOptimized implements IWatchManager, IDeadWatcherListener { 55 56 private static final Logger LOG = LoggerFactory.getLogger(WatchManagerOptimized.class); 57 58 private final ConcurrentHashMap<String, BitHashSet> pathWatches = new ConcurrentHashMap<String, BitHashSet>(); 59 60 // watcher to bit id mapping 61 private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>(); 62 63 // used to lazily remove the dead watchers 64 private final WatcherCleaner watcherCleaner; 65 66 private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); 67 WatchManagerOptimized()68 public WatchManagerOptimized() { 69 watcherCleaner = new WatcherCleaner(this); 70 watcherCleaner.start(); 71 } 72 73 @Override addWatch(String path, Watcher watcher)74 public boolean addWatch(String path, Watcher watcher) { 75 boolean result = false; 76 // Need readLock to exclusively lock with removeWatcher, otherwise we 77 // may add a dead watch whose connection was just closed. 78 // 79 // Creating new watcher bit and adding it to the BitHashSet has it's 80 // own lock to minimize the write lock scope 81 addRemovePathRWLock.readLock().lock(); 82 try { 83 // avoid race condition of adding a on flying dead watcher 84 if (isDeadWatcher(watcher)) { 85 LOG.debug("Ignoring addWatch with closed cnxn"); 86 } else { 87 Integer bit = watcherBitIdMap.add(watcher); 88 BitHashSet watchers = pathWatches.get(path); 89 if (watchers == null) { 90 watchers = new BitHashSet(); 91 BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); 92 // it's possible multiple thread might add to pathWatches 93 // while we're holding read lock, so we need this check 94 // here 95 if (existingWatchers != null) { 96 watchers = existingWatchers; 97 } 98 } 99 result = watchers.add(bit); 100 } 101 } finally { 102 addRemovePathRWLock.readLock().unlock(); 103 } 104 return result; 105 } 106 107 /** 108 * Used in the OpCode.checkWatches, which is a read operation, since read 109 * and write requests are exclusively processed, we don't need to hold 110 * lock here. 111 * 112 * Different from addWatch this method doesn't mutate any state, so we don't 113 * need to hold read lock to avoid dead watcher (cnxn closed) being added 114 * to the watcher manager. 115 * 116 * It's possible that before we lazily clean up the dead watcher, this will 117 * return true, but since the cnxn is closed, the response will dropped as 118 * well, so it doesn't matter. 119 */ 120 @Override containsWatcher(String path, Watcher watcher)121 public boolean containsWatcher(String path, Watcher watcher) { 122 BitHashSet watchers = pathWatches.get(path); 123 return watchers != null && watchers.contains(watcherBitIdMap.getBit(watcher)); 124 } 125 126 @Override removeWatcher(String path, Watcher watcher)127 public boolean removeWatcher(String path, Watcher watcher) { 128 // Hold write lock directly because removeWatcher request is more 129 // likely to be invoked when the watcher is actually exist and 130 // haven't fired yet, so instead of having read lock to check existence 131 // before switching to write one, it's actually cheaper to hold write 132 // lock directly here. 133 addRemovePathRWLock.writeLock().lock(); 134 try { 135 BitHashSet list = pathWatches.get(path); 136 if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) { 137 return false; 138 } 139 if (list.isEmpty()) { 140 pathWatches.remove(path); 141 } 142 return true; 143 } finally { 144 addRemovePathRWLock.writeLock().unlock(); 145 } 146 } 147 148 @Override removeWatcher(Watcher watcher)149 public void removeWatcher(Watcher watcher) { 150 Integer watcherBit; 151 // Use exclusive lock with addWatcher to guarantee that we won't add 152 // watch for a cnxn which is already closed. 153 addRemovePathRWLock.writeLock().lock(); 154 try { 155 // do nothing if the watcher is not tracked 156 watcherBit = watcherBitIdMap.getBit(watcher); 157 if (watcherBit == null) { 158 return; 159 } 160 } finally { 161 addRemovePathRWLock.writeLock().unlock(); 162 } 163 164 // We can guarantee that when this line is executed, the cnxn of this 165 // watcher has already been marked as stale (this method is only called 166 // from ServerCnxn.close after we set stale), which means no watches 167 // will be added to the watcher manager with this watcher, so that we 168 // can safely clean up this dead watcher. 169 // 170 // So it's not necessary to have this line in the addRemovePathRWLock. 171 // And moving the addDeadWatcher out of the locking block to avoid 172 // holding the write lock while we're blocked on adding dead watchers 173 // into the watcherCleaner. 174 watcherCleaner.addDeadWatcher(watcherBit); 175 } 176 177 /** 178 * Entry for WatcherCleaner to remove dead watchers 179 * 180 * @param deadWatchers the watchers need to be removed 181 */ 182 @Override processDeadWatchers(Set<Integer> deadWatchers)183 public void processDeadWatchers(Set<Integer> deadWatchers) { 184 // All the watchers being processed here are guaranteed to be dead, 185 // no watches will be added for those dead watchers, that's why I 186 // don't need to have addRemovePathRWLock here. 187 BitSet bits = new BitSet(); 188 for (int dw : deadWatchers) { 189 bits.set(dw); 190 } 191 // The value iterator will reflect the state when it was 192 // created, don't need to synchronize. 193 for (BitHashSet watchers : pathWatches.values()) { 194 watchers.remove(deadWatchers, bits); 195 } 196 // Better to remove the empty path from pathWatches, but it will add 197 // lot of lock contention and affect the throughput of addWatch, 198 // let's rely on the triggerWatch to delete it. 199 for (Integer wbit : deadWatchers) { 200 watcherBitIdMap.remove(wbit); 201 } 202 } 203 204 @Override triggerWatch(String path, EventType type)205 public WatcherOrBitSet triggerWatch(String path, EventType type) { 206 return triggerWatch(path, type, null); 207 } 208 209 @Override triggerWatch(String path, EventType type, WatcherOrBitSet suppress)210 public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) { 211 WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); 212 213 BitHashSet watchers = remove(path); 214 if (watchers == null) { 215 return null; 216 } 217 218 int triggeredWatches = 0; 219 220 // Avoid race condition between dead watcher cleaner in 221 // WatcherCleaner and iterating here 222 synchronized (watchers) { 223 for (Integer wBit : watchers) { 224 if (suppress != null && suppress.contains(wBit)) { 225 continue; 226 } 227 228 Watcher w = watcherBitIdMap.get(wBit); 229 230 // skip dead watcher 231 if (w == null || isDeadWatcher(w)) { 232 continue; 233 } 234 235 w.process(e); 236 triggeredWatches++; 237 } 238 } 239 240 updateMetrics(type, triggeredWatches); 241 return new WatcherOrBitSet(watchers); 242 } 243 244 @Override size()245 public int size() { 246 int size = 0; 247 for (BitHashSet watches : pathWatches.values()) { 248 size += watches.size(); 249 } 250 return size; 251 } 252 253 @Override shutdown()254 public void shutdown() { 255 if (watcherCleaner != null) { 256 watcherCleaner.shutdown(); 257 } 258 } 259 remove(String path)260 private BitHashSet remove(String path) { 261 addRemovePathRWLock.writeLock().lock(); 262 try { 263 return pathWatches.remove(path); 264 } finally { 265 addRemovePathRWLock.writeLock().unlock(); 266 } 267 } 268 updateMetrics(final EventType type, int size)269 void updateMetrics(final EventType type, int size) { 270 switch (type) { 271 case NodeCreated: 272 ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(size); 273 break; 274 275 case NodeDeleted: 276 ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(size); 277 break; 278 279 case NodeDataChanged: 280 ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(size); 281 break; 282 283 case NodeChildrenChanged: 284 ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(size); 285 break; 286 default: 287 // Other types not logged. 288 break; 289 } 290 } 291 isDeadWatcher(Watcher watcher)292 boolean isDeadWatcher(Watcher watcher) { 293 return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale(); 294 } 295 pathSize()296 int pathSize() { 297 return pathWatches.size(); 298 } 299 300 @Override getWatchesSummary()301 public WatchesSummary getWatchesSummary() { 302 return new WatchesSummary(watcherBitIdMap.size(), pathSize(), size()); 303 } 304 305 @Override getWatches()306 public WatchesReport getWatches() { 307 Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>(); 308 for (Entry<Watcher, Set<String>> e : getWatcher2PathesMap().entrySet()) { 309 Long id = ((ServerCnxn) e.getKey()).getSessionId(); 310 Set<String> paths = new HashSet<String>(e.getValue()); 311 id2paths.put(id, paths); 312 } 313 return new WatchesReport(id2paths); 314 } 315 316 /** 317 * Iterate through ConcurrentHashMap is 'safe', it will reflect the state 318 * of the map at the time iteration began, may miss update while iterating, 319 * given this is used in the commands to get a general idea of the watches 320 * state, we don't care about missing some update. 321 */ 322 @Override getWatchesByPath()323 public WatchesPathReport getWatchesByPath() { 324 Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>(); 325 for (Entry<String, BitHashSet> e : pathWatches.entrySet()) { 326 BitHashSet watchers = e.getValue(); 327 synchronized (watchers) { 328 Set<Long> ids = new HashSet<Long>(watchers.size()); 329 path2ids.put(e.getKey(), ids); 330 for (Integer wbit : watchers) { 331 Watcher watcher = watcherBitIdMap.get(wbit); 332 if (watcher instanceof ServerCnxn) { 333 ids.add(((ServerCnxn) watcher).getSessionId()); 334 } 335 } 336 } 337 } 338 return new WatchesPathReport(path2ids); 339 } 340 341 /** 342 * May cause OOM if there are lots of watches, might better to forbid 343 * it in this class. 344 */ getWatcher2PathesMap()345 public Map<Watcher, Set<String>> getWatcher2PathesMap() { 346 Map<Watcher, Set<String>> watcher2paths = new HashMap<Watcher, Set<String>>(); 347 for (Entry<String, BitHashSet> e : pathWatches.entrySet()) { 348 String path = e.getKey(); 349 BitHashSet watchers = e.getValue(); 350 // avoid race condition with add/remove 351 synchronized (watchers) { 352 for (Integer wbit : watchers) { 353 Watcher w = watcherBitIdMap.get(wbit); 354 if (w == null) { 355 continue; 356 } 357 if (!watcher2paths.containsKey(w)) { 358 watcher2paths.put(w, new HashSet<String>()); 359 } 360 watcher2paths.get(w).add(path); 361 } 362 } 363 } 364 return watcher2paths; 365 } 366 367 @Override dumpWatches(PrintWriter pwriter, boolean byPath)368 public void dumpWatches(PrintWriter pwriter, boolean byPath) { 369 if (byPath) { 370 for (Entry<String, BitHashSet> e : pathWatches.entrySet()) { 371 pwriter.println(e.getKey()); 372 BitHashSet watchers = e.getValue(); 373 synchronized (watchers) { 374 for (Integer wbit : watchers) { 375 Watcher w = watcherBitIdMap.get(wbit); 376 if (!(w instanceof ServerCnxn)) { 377 continue; 378 } 379 pwriter.print("\t0x"); 380 pwriter.print(Long.toHexString(((ServerCnxn) w).getSessionId())); 381 pwriter.print("\n"); 382 } 383 } 384 } 385 } else { 386 for (Entry<Watcher, Set<String>> e : getWatcher2PathesMap().entrySet()) { 387 pwriter.print("0x"); 388 pwriter.println(Long.toHexString(((ServerCnxn) e.getKey()).getSessionId())); 389 for (String path : e.getValue()) { 390 pwriter.print("\t"); 391 pwriter.println(path); 392 } 393 } 394 } 395 } 396 397 @Override toString()398 public String toString() { 399 StringBuilder sb = new StringBuilder(); 400 sb.append(watcherBitIdMap.size()).append(" connections watching ").append(pathSize()).append(" paths\n"); 401 sb.append("Total watches:").append(size()); 402 return sb.toString(); 403 } 404 405 } 406