1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.zookeeper.server.watch;
20 
21 import java.io.PrintWriter;
22 import java.util.BitSet;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.locks.ReentrantReadWriteLock;
30 import org.apache.zookeeper.WatchedEvent;
31 import org.apache.zookeeper.Watcher;
32 import org.apache.zookeeper.Watcher.Event.EventType;
33 import org.apache.zookeeper.Watcher.Event.KeeperState;
34 import org.apache.zookeeper.server.ServerCnxn;
35 import org.apache.zookeeper.server.ServerMetrics;
36 import org.apache.zookeeper.server.util.BitHashSet;
37 import org.apache.zookeeper.server.util.BitMap;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 
41 /**
42  * Optimized in memory and time complexity, compared to WatchManager, both the
43  * memory consumption and time complexity improved a lot, but it cannot
44  * efficiently remove the watcher when the session or socket is closed, for
45  * majority use case this is not a problem.
46  *
47  * Changed made compared to WatchManager:
48  *
49  * - Use HashSet and BitSet to store the watchers to find a balance between
50  *   memory usage and time complexity
51  * - Use ReadWriteLock instead of synchronized to reduce lock retention
52  * - Lazily clean up the closed watchers
53  */
54 public class WatchManagerOptimized implements IWatchManager, IDeadWatcherListener {
55 
56     private static final Logger LOG = LoggerFactory.getLogger(WatchManagerOptimized.class);
57 
58     private final ConcurrentHashMap<String, BitHashSet> pathWatches = new ConcurrentHashMap<String, BitHashSet>();
59 
60     // watcher to bit id mapping
61     private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
62 
63     // used to lazily remove the dead watchers
64     private final WatcherCleaner watcherCleaner;
65 
66     private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
67 
WatchManagerOptimized()68     public WatchManagerOptimized() {
69         watcherCleaner = new WatcherCleaner(this);
70         watcherCleaner.start();
71     }
72 
73     @Override
addWatch(String path, Watcher watcher)74     public boolean addWatch(String path, Watcher watcher) {
75         boolean result = false;
76         // Need readLock to exclusively lock with removeWatcher, otherwise we
77         // may add a dead watch whose connection was just closed.
78         //
79         // Creating new watcher bit and adding it to the BitHashSet has it's
80         // own lock to minimize the write lock scope
81         addRemovePathRWLock.readLock().lock();
82         try {
83             // avoid race condition of adding a on flying dead watcher
84             if (isDeadWatcher(watcher)) {
85                 LOG.debug("Ignoring addWatch with closed cnxn");
86             } else {
87                 Integer bit = watcherBitIdMap.add(watcher);
88                 BitHashSet watchers = pathWatches.get(path);
89                 if (watchers == null) {
90                     watchers = new BitHashSet();
91                     BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
92                     // it's possible multiple thread might add to pathWatches
93                     // while we're holding read lock, so we need this check
94                     // here
95                     if (existingWatchers != null) {
96                         watchers = existingWatchers;
97                     }
98                 }
99                 result = watchers.add(bit);
100             }
101         } finally {
102             addRemovePathRWLock.readLock().unlock();
103         }
104         return result;
105     }
106 
107     /**
108      * Used in the OpCode.checkWatches, which is a read operation, since read
109      * and write requests are exclusively processed, we don't need to hold
110      * lock here.
111      *
112      * Different from addWatch this method doesn't mutate any state, so we don't
113      * need to hold read lock to avoid dead watcher (cnxn closed) being added
114      * to the watcher manager.
115      *
116      * It's possible that before we lazily clean up the dead watcher, this will
117      * return true, but since the cnxn is closed, the response will dropped as
118      * well, so it doesn't matter.
119      */
120     @Override
containsWatcher(String path, Watcher watcher)121     public boolean containsWatcher(String path, Watcher watcher) {
122         BitHashSet watchers = pathWatches.get(path);
123         return watchers != null && watchers.contains(watcherBitIdMap.getBit(watcher));
124     }
125 
126     @Override
removeWatcher(String path, Watcher watcher)127     public boolean removeWatcher(String path, Watcher watcher) {
128         // Hold write lock directly because removeWatcher request is more
129         // likely to be invoked when the watcher is actually exist and
130         // haven't fired yet, so instead of having read lock to check existence
131         // before switching to write one, it's actually cheaper to hold write
132         // lock directly here.
133         addRemovePathRWLock.writeLock().lock();
134         try {
135             BitHashSet list = pathWatches.get(path);
136             if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
137                 return false;
138             }
139             if (list.isEmpty()) {
140                 pathWatches.remove(path);
141             }
142             return true;
143         } finally {
144             addRemovePathRWLock.writeLock().unlock();
145         }
146     }
147 
148     @Override
removeWatcher(Watcher watcher)149     public void removeWatcher(Watcher watcher) {
150         Integer watcherBit;
151         // Use exclusive lock with addWatcher to guarantee that we won't add
152         // watch for a cnxn which is already closed.
153         addRemovePathRWLock.writeLock().lock();
154         try {
155             // do nothing if the watcher is not tracked
156             watcherBit = watcherBitIdMap.getBit(watcher);
157             if (watcherBit == null) {
158                 return;
159             }
160         } finally {
161             addRemovePathRWLock.writeLock().unlock();
162         }
163 
164         // We can guarantee that when this line is executed, the cnxn of this
165         // watcher has already been marked as stale (this method is only called
166         // from ServerCnxn.close after we set stale), which means no watches
167         // will be added to the watcher manager with this watcher, so that we
168         // can safely clean up this dead watcher.
169         //
170         // So it's not necessary to have this line in the addRemovePathRWLock.
171         // And moving the addDeadWatcher out of the locking block to avoid
172         // holding the write lock while we're blocked on adding dead watchers
173         // into the watcherCleaner.
174         watcherCleaner.addDeadWatcher(watcherBit);
175     }
176 
177     /**
178      * Entry for WatcherCleaner to remove dead watchers
179      *
180      * @param deadWatchers the watchers need to be removed
181      */
182     @Override
processDeadWatchers(Set<Integer> deadWatchers)183     public void processDeadWatchers(Set<Integer> deadWatchers) {
184         // All the watchers being processed here are guaranteed to be dead,
185         // no watches will be added for those dead watchers, that's why I
186         // don't need to have addRemovePathRWLock here.
187         BitSet bits = new BitSet();
188         for (int dw : deadWatchers) {
189             bits.set(dw);
190         }
191         // The value iterator will reflect the state when it was
192         // created, don't need to synchronize.
193         for (BitHashSet watchers : pathWatches.values()) {
194             watchers.remove(deadWatchers, bits);
195         }
196         // Better to remove the empty path from pathWatches, but it will add
197         // lot of lock contention and affect the throughput of addWatch,
198         // let's rely on the triggerWatch to delete it.
199         for (Integer wbit : deadWatchers) {
200             watcherBitIdMap.remove(wbit);
201         }
202     }
203 
204     @Override
triggerWatch(String path, EventType type)205     public WatcherOrBitSet triggerWatch(String path, EventType type) {
206         return triggerWatch(path, type, null);
207     }
208 
209     @Override
triggerWatch(String path, EventType type, WatcherOrBitSet suppress)210     public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) {
211         WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
212 
213         BitHashSet watchers = remove(path);
214         if (watchers == null) {
215             return null;
216         }
217 
218         int triggeredWatches = 0;
219 
220         // Avoid race condition between dead watcher cleaner in
221         // WatcherCleaner and iterating here
222         synchronized (watchers) {
223             for (Integer wBit : watchers) {
224                 if (suppress != null && suppress.contains(wBit)) {
225                     continue;
226                 }
227 
228                 Watcher w = watcherBitIdMap.get(wBit);
229 
230                 // skip dead watcher
231                 if (w == null || isDeadWatcher(w)) {
232                     continue;
233                 }
234 
235                 w.process(e);
236                 triggeredWatches++;
237             }
238         }
239 
240         updateMetrics(type, triggeredWatches);
241         return new WatcherOrBitSet(watchers);
242     }
243 
244     @Override
size()245     public int size() {
246         int size = 0;
247         for (BitHashSet watches : pathWatches.values()) {
248             size += watches.size();
249         }
250         return size;
251     }
252 
253     @Override
shutdown()254     public void shutdown() {
255         if (watcherCleaner != null) {
256             watcherCleaner.shutdown();
257         }
258     }
259 
remove(String path)260     private BitHashSet remove(String path) {
261         addRemovePathRWLock.writeLock().lock();
262         try {
263             return pathWatches.remove(path);
264         } finally {
265             addRemovePathRWLock.writeLock().unlock();
266         }
267     }
268 
updateMetrics(final EventType type, int size)269     void updateMetrics(final EventType type, int size) {
270         switch (type) {
271         case NodeCreated:
272             ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(size);
273             break;
274 
275         case NodeDeleted:
276             ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(size);
277             break;
278 
279         case NodeDataChanged:
280             ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(size);
281             break;
282 
283         case NodeChildrenChanged:
284             ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(size);
285             break;
286         default:
287             // Other types not logged.
288             break;
289         }
290     }
291 
isDeadWatcher(Watcher watcher)292     boolean isDeadWatcher(Watcher watcher) {
293         return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
294     }
295 
pathSize()296     int pathSize() {
297         return pathWatches.size();
298     }
299 
300     @Override
getWatchesSummary()301     public WatchesSummary getWatchesSummary() {
302         return new WatchesSummary(watcherBitIdMap.size(), pathSize(), size());
303     }
304 
305     @Override
getWatches()306     public WatchesReport getWatches() {
307         Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
308         for (Entry<Watcher, Set<String>> e : getWatcher2PathesMap().entrySet()) {
309             Long id = ((ServerCnxn) e.getKey()).getSessionId();
310             Set<String> paths = new HashSet<String>(e.getValue());
311             id2paths.put(id, paths);
312         }
313         return new WatchesReport(id2paths);
314     }
315 
316     /**
317      * Iterate through ConcurrentHashMap is 'safe', it will reflect the state
318      * of the map at the time iteration began, may miss update while iterating,
319      * given this is used in the commands to get a general idea of the watches
320      * state, we don't care about missing some update.
321      */
322     @Override
getWatchesByPath()323     public WatchesPathReport getWatchesByPath() {
324         Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
325         for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
326             BitHashSet watchers = e.getValue();
327             synchronized (watchers) {
328                 Set<Long> ids = new HashSet<Long>(watchers.size());
329                 path2ids.put(e.getKey(), ids);
330                 for (Integer wbit : watchers) {
331                     Watcher watcher = watcherBitIdMap.get(wbit);
332                     if (watcher instanceof ServerCnxn) {
333                         ids.add(((ServerCnxn) watcher).getSessionId());
334                     }
335                 }
336             }
337         }
338         return new WatchesPathReport(path2ids);
339     }
340 
341     /**
342      * May cause OOM if there are lots of watches, might better to forbid
343      * it in this class.
344      */
getWatcher2PathesMap()345     public Map<Watcher, Set<String>> getWatcher2PathesMap() {
346         Map<Watcher, Set<String>> watcher2paths = new HashMap<Watcher, Set<String>>();
347         for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
348             String path = e.getKey();
349             BitHashSet watchers = e.getValue();
350             // avoid race condition with add/remove
351             synchronized (watchers) {
352                 for (Integer wbit : watchers) {
353                     Watcher w = watcherBitIdMap.get(wbit);
354                     if (w == null) {
355                         continue;
356                     }
357                     if (!watcher2paths.containsKey(w)) {
358                         watcher2paths.put(w, new HashSet<String>());
359                     }
360                     watcher2paths.get(w).add(path);
361                 }
362             }
363         }
364         return watcher2paths;
365     }
366 
367     @Override
dumpWatches(PrintWriter pwriter, boolean byPath)368     public void dumpWatches(PrintWriter pwriter, boolean byPath) {
369         if (byPath) {
370             for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
371                 pwriter.println(e.getKey());
372                 BitHashSet watchers = e.getValue();
373                 synchronized (watchers) {
374                     for (Integer wbit : watchers) {
375                         Watcher w = watcherBitIdMap.get(wbit);
376                         if (!(w instanceof ServerCnxn)) {
377                             continue;
378                         }
379                         pwriter.print("\t0x");
380                         pwriter.print(Long.toHexString(((ServerCnxn) w).getSessionId()));
381                         pwriter.print("\n");
382                     }
383                 }
384             }
385         } else {
386             for (Entry<Watcher, Set<String>> e : getWatcher2PathesMap().entrySet()) {
387                 pwriter.print("0x");
388                 pwriter.println(Long.toHexString(((ServerCnxn) e.getKey()).getSessionId()));
389                 for (String path : e.getValue()) {
390                     pwriter.print("\t");
391                     pwriter.println(path);
392                 }
393             }
394         }
395     }
396 
397     @Override
toString()398     public String toString() {
399         StringBuilder sb = new StringBuilder();
400         sb.append(watcherBitIdMap.size()).append(" connections watching ").append(pathSize()).append(" paths\n");
401         sb.append("Total watches:").append(size());
402         return sb.toString();
403     }
404 
405 }
406