1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.net.unix;
19 
20 import java.io.Closeable;
21 import java.io.EOFException;
22 
23 import org.apache.hadoop.classification.InterfaceAudience;
24 import org.apache.hadoop.io.IOUtils;
25 
26 import java.io.IOException;
27 import java.nio.channels.ClosedChannelException;
28 import java.util.Iterator;
29 import java.util.LinkedList;
30 import java.util.TreeMap;
31 import java.util.Map;
32 import java.util.concurrent.locks.Condition;
33 import java.util.concurrent.locks.ReentrantLock;
34 
35 import org.apache.commons.lang.SystemUtils;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.util.NativeCodeLoader;
39 
40 import com.google.common.annotations.VisibleForTesting;
41 import com.google.common.base.Preconditions;
42 import com.google.common.util.concurrent.Uninterruptibles;
43 
44 /**
45  * The DomainSocketWatcher watches a set of domain sockets to see when they
46  * become readable, or closed.  When one of those events happens, it makes a
47  * callback.
48  *
49  * See {@link DomainSocket} for more information about UNIX domain sockets.
50  */
51 @InterfaceAudience.LimitedPrivate("HDFS")
52 public final class DomainSocketWatcher implements Closeable {
53   static {
54     if (SystemUtils.IS_OS_WINDOWS) {
55       loadingFailureReason = "UNIX Domain sockets are not available on Windows.";
56     } else if (!NativeCodeLoader.isNativeCodeLoaded()) {
57       loadingFailureReason = "libhadoop cannot be loaded.";
58     } else {
59       String problem;
60       try {
anchorNative()61         anchorNative();
62         problem = null;
63       } catch (Throwable t) {
64         problem = "DomainSocketWatcher#anchorNative got error: " +
65           t.getMessage();
66       }
67       loadingFailureReason = problem;
68     }
69   }
70 
71   static Log LOG = LogFactory.getLog(DomainSocketWatcher.class);
72 
73   /**
74    * The reason why DomainSocketWatcher is not available, or null if it is
75    * available.
76    */
77   private final static String loadingFailureReason;
78 
79   /**
80    * Initializes the native library code.
81    */
anchorNative()82   private static native void anchorNative();
83 
getLoadingFailureReason()84   public static String getLoadingFailureReason() {
85     return loadingFailureReason;
86   }
87 
88   public interface Handler {
89     /**
90      * Handles an event on a socket.  An event may be the socket becoming
91      * readable, or the remote end being closed.
92      *
93      * @param sock    The socket that the event occurred on.
94      * @return        Whether we should close the socket.
95      */
handle(DomainSocket sock)96     boolean handle(DomainSocket sock);
97   }
98 
99   /**
100    * Handler for {DomainSocketWatcher#notificationSockets[1]}
101    */
102   private class NotificationHandler implements Handler {
handle(DomainSocket sock)103     public boolean handle(DomainSocket sock) {
104       assert(lock.isHeldByCurrentThread());
105       try {
106         kicked = false;
107         if (LOG.isTraceEnabled()) {
108           LOG.trace(this + ": NotificationHandler: doing a read on " +
109             sock.fd);
110         }
111         if (sock.getInputStream().read() == -1) {
112           if (LOG.isTraceEnabled()) {
113             LOG.trace(this + ": NotificationHandler: got EOF on " + sock.fd);
114           }
115           throw new EOFException();
116         }
117         if (LOG.isTraceEnabled()) {
118           LOG.trace(this + ": NotificationHandler: read succeeded on " +
119             sock.fd);
120         }
121         return false;
122       } catch (IOException e) {
123         if (LOG.isTraceEnabled()) {
124           LOG.trace(this + ": NotificationHandler: setting closed to " +
125               "true for " + sock.fd);
126         }
127         closed = true;
128         return true;
129       }
130     }
131   }
132 
133   private static class Entry {
134     final DomainSocket socket;
135     final Handler handler;
136 
Entry(DomainSocket socket, Handler handler)137     Entry(DomainSocket socket, Handler handler) {
138       this.socket = socket;
139       this.handler = handler;
140     }
141 
getDomainSocket()142     DomainSocket getDomainSocket() {
143       return socket;
144     }
145 
getHandler()146     Handler getHandler() {
147       return handler;
148     }
149   }
150 
151   /**
152    * The FdSet is a set of file descriptors that gets passed to poll(2).
153    * It contains a native memory segment, so that we don't have to copy
154    * in the poll0 function.
155    */
156   private static class FdSet {
157     private long data;
158 
alloc0()159     private native static long alloc0();
160 
FdSet()161     FdSet() {
162       data = alloc0();
163     }
164 
165     /**
166      * Add a file descriptor to the set.
167      *
168      * @param fd   The file descriptor to add.
169      */
add(int fd)170     native void add(int fd);
171 
172     /**
173      * Remove a file descriptor from the set.
174      *
175      * @param fd   The file descriptor to remove.
176      */
remove(int fd)177     native void remove(int fd);
178 
179     /**
180      * Get an array containing all the FDs marked as readable.
181      * Also clear the state of all FDs.
182      *
183      * @return     An array containing all of the currently readable file
184      *             descriptors.
185      */
getAndClearReadableFds()186     native int[] getAndClearReadableFds();
187 
188     /**
189      * Close the object and de-allocate the memory used.
190      */
close()191     native void close();
192   }
193 
194   /**
195    * Lock which protects toAdd, toRemove, and closed.
196    */
197   private final ReentrantLock lock = new ReentrantLock();
198 
199   /**
200    * Condition variable which indicates that toAdd and toRemove have been
201    * processed.
202    */
203   private final Condition processedCond = lock.newCondition();
204 
205   /**
206    * Entries to add.
207    */
208   private final LinkedList<Entry> toAdd =
209       new LinkedList<Entry>();
210 
211   /**
212    * Entries to remove.
213    */
214   private final TreeMap<Integer, DomainSocket> toRemove =
215       new TreeMap<Integer, DomainSocket>();
216 
217   /**
218    * Maximum length of time to go between checking whether the interrupted
219    * bit has been set for this thread.
220    */
221   private final int interruptCheckPeriodMs;
222 
223   /**
224    * A pair of sockets used to wake up the thread after it has called poll(2).
225    */
226   private final DomainSocket notificationSockets[];
227 
228   /**
229    * Whether or not this DomainSocketWatcher is closed.
230    */
231   private boolean closed = false;
232 
233   /**
234    * True if we have written a byte to the notification socket. We should not
235    * write anything else to the socket until the notification handler has had a
236    * chance to run. Otherwise, our thread might block, causing deadlock.
237    * See HADOOP-11333 for details.
238    */
239   private boolean kicked = false;
240 
DomainSocketWatcher(int interruptCheckPeriodMs, String src)241   public DomainSocketWatcher(int interruptCheckPeriodMs, String src)
242       throws IOException {
243     if (loadingFailureReason != null) {
244       throw new UnsupportedOperationException(loadingFailureReason);
245     }
246     Preconditions.checkArgument(interruptCheckPeriodMs > 0);
247     this.interruptCheckPeriodMs = interruptCheckPeriodMs;
248     notificationSockets = DomainSocket.socketpair();
249     watcherThread.setDaemon(true);
250     watcherThread.setName(src + " DomainSocketWatcher");
251     watcherThread
252         .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
253           @Override
254           public void uncaughtException(Thread thread, Throwable t) {
255             LOG.error(thread + " terminating on unexpected exception", t);
256           }
257         });
258     watcherThread.start();
259   }
260 
261   /**
262    * Close the DomainSocketWatcher and wait for its thread to terminate.
263    *
264    * If there is more than one close, all but the first will be ignored.
265    */
266   @Override
close()267   public void close() throws IOException {
268     lock.lock();
269     try {
270       if (closed) return;
271       if (LOG.isDebugEnabled()) {
272         LOG.debug(this + ": closing");
273       }
274       closed = true;
275     } finally {
276       lock.unlock();
277     }
278     // Close notificationSockets[0], so that notificationSockets[1] gets an EOF
279     // event.  This will wake up the thread immediately if it is blocked inside
280     // the select() system call.
281     notificationSockets[0].close();
282     // Wait for the select thread to terminate.
283     Uninterruptibles.joinUninterruptibly(watcherThread);
284   }
285 
286   @VisibleForTesting
isClosed()287   public boolean isClosed() {
288     lock.lock();
289     try {
290       return closed;
291     } finally {
292       lock.unlock();
293     }
294   }
295 
296   /**
297    * Add a socket.
298    *
299    * @param sock     The socket to add.  It is an error to re-add a socket that
300    *                   we are already watching.
301    * @param handler  The handler to associate with this socket.  This may be
302    *                   called any time after this function is called.
303    */
add(DomainSocket sock, Handler handler)304   public void add(DomainSocket sock, Handler handler) {
305     lock.lock();
306     try {
307       if (closed) {
308         handler.handle(sock);
309         IOUtils.cleanup(LOG, sock);
310         return;
311       }
312       Entry entry = new Entry(sock, handler);
313       try {
314         sock.refCount.reference();
315       } catch (ClosedChannelException e1) {
316         // If the socket is already closed before we add it, invoke the
317         // handler immediately.  Then we're done.
318         handler.handle(sock);
319         return;
320       }
321       toAdd.add(entry);
322       kick();
323       while (true) {
324         try {
325           processedCond.await();
326         } catch (InterruptedException e) {
327           Thread.currentThread().interrupt();
328         }
329         if (!toAdd.contains(entry)) {
330           break;
331         }
332       }
333     } finally {
334       lock.unlock();
335     }
336   }
337 
338   /**
339    * Remove a socket.  Its handler will be called.
340    *
341    * @param sock     The socket to remove.
342    */
remove(DomainSocket sock)343   public void remove(DomainSocket sock) {
344     lock.lock();
345     try {
346       if (closed) return;
347       toRemove.put(sock.fd, sock);
348       kick();
349       while (true) {
350         try {
351           processedCond.await();
352         } catch (InterruptedException e) {
353           Thread.currentThread().interrupt();
354         }
355         if (!toRemove.containsKey(sock.fd)) {
356           break;
357         }
358       }
359     } finally {
360       lock.unlock();
361     }
362   }
363 
364   /**
365    * Wake up the DomainSocketWatcher thread.
366    */
kick()367   private void kick() {
368     assert(lock.isHeldByCurrentThread());
369 
370     if (kicked) {
371       return;
372     }
373 
374     try {
375       notificationSockets[0].getOutputStream().write(0);
376       kicked = true;
377     } catch (IOException e) {
378       if (!closed) {
379         LOG.error(this + ": error writing to notificationSockets[0]", e);
380       }
381     }
382   }
383 
384   /**
385    * Send callback and return whether or not the domain socket was closed as a
386    * result of processing.
387    *
388    * @param caller reason for call
389    * @param entries mapping of file descriptor to entry
390    * @param fdSet set of file descriptors
391    * @param fd file descriptor
392    * @return true if the domain socket was closed as a result of processing
393    */
sendCallback(String caller, TreeMap<Integer, Entry> entries, FdSet fdSet, int fd)394   private boolean sendCallback(String caller, TreeMap<Integer, Entry> entries,
395       FdSet fdSet, int fd) {
396     if (LOG.isTraceEnabled()) {
397       LOG.trace(this + ": " + caller + " starting sendCallback for fd " + fd);
398     }
399     Entry entry = entries.get(fd);
400     Preconditions.checkNotNull(entry,
401         this + ": fdSet contained " + fd + ", which we were " +
402         "not tracking.");
403     DomainSocket sock = entry.getDomainSocket();
404     if (entry.getHandler().handle(sock)) {
405       if (LOG.isTraceEnabled()) {
406         LOG.trace(this + ": " + caller + ": closing fd " + fd +
407             " at the request of the handler.");
408       }
409       if (toRemove.remove(fd) != null) {
410         if (LOG.isTraceEnabled()) {
411           LOG.trace(this + ": " + caller + " : sendCallback processed fd " +
412             fd  + " in toRemove.");
413         }
414       }
415       try {
416         sock.refCount.unreferenceCheckClosed();
417       } catch (IOException e) {
418         Preconditions.checkArgument(false,
419             this + ": file descriptor " + sock.fd + " was closed while " +
420             "still in the poll(2) loop.");
421       }
422       IOUtils.cleanup(LOG, sock);
423       fdSet.remove(fd);
424       return true;
425     } else {
426       if (LOG.isTraceEnabled()) {
427         LOG.trace(this + ": " + caller + ": sendCallback not " +
428             "closing fd " + fd);
429       }
430       return false;
431     }
432   }
433 
434   /**
435    * Send callback, and if the domain socket was closed as a result of
436    * processing, then also remove the entry for the file descriptor.
437    *
438    * @param caller reason for call
439    * @param entries mapping of file descriptor to entry
440    * @param fdSet set of file descriptors
441    * @param fd file descriptor
442    */
sendCallbackAndRemove(String caller, TreeMap<Integer, Entry> entries, FdSet fdSet, int fd)443   private void sendCallbackAndRemove(String caller,
444       TreeMap<Integer, Entry> entries, FdSet fdSet, int fd) {
445     if (sendCallback(caller, entries, fdSet, fd)) {
446       entries.remove(fd);
447     }
448   }
449 
450   @VisibleForTesting
451   final Thread watcherThread = new Thread(new Runnable() {
452     @Override
453     public void run() {
454       if (LOG.isDebugEnabled()) {
455         LOG.debug(this + ": starting with interruptCheckPeriodMs = " +
456             interruptCheckPeriodMs);
457       }
458       final TreeMap<Integer, Entry> entries = new TreeMap<Integer, Entry>();
459       FdSet fdSet = new FdSet();
460       addNotificationSocket(entries, fdSet);
461       try {
462         while (true) {
463           lock.lock();
464           try {
465             for (int fd : fdSet.getAndClearReadableFds()) {
466               sendCallbackAndRemove("getAndClearReadableFds", entries, fdSet,
467                   fd);
468             }
469             if (!(toAdd.isEmpty() && toRemove.isEmpty())) {
470               // Handle pending additions (before pending removes).
471               for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) {
472                 Entry entry = iter.next();
473                 DomainSocket sock = entry.getDomainSocket();
474                 Entry prevEntry = entries.put(sock.fd, entry);
475                 Preconditions.checkState(prevEntry == null,
476                     this + ": tried to watch a file descriptor that we " +
477                     "were already watching: " + sock);
478                 if (LOG.isTraceEnabled()) {
479                   LOG.trace(this + ": adding fd " + sock.fd);
480                 }
481                 fdSet.add(sock.fd);
482                 iter.remove();
483               }
484               // Handle pending removals
485               while (true) {
486                 Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry();
487                 if (entry == null) break;
488                 sendCallbackAndRemove("handlePendingRemovals",
489                     entries, fdSet, entry.getValue().fd);
490               }
491               processedCond.signalAll();
492             }
493             // Check if the thread should terminate.  Doing this check now is
494             // easier than at the beginning of the loop, since we know toAdd and
495             // toRemove are now empty and processedCond has been notified if it
496             // needed to be.
497             if (closed) {
498               if (LOG.isDebugEnabled()) {
499                 LOG.debug(toString() + " thread terminating.");
500               }
501               return;
502             }
503             // Check if someone sent our thread an InterruptedException while we
504             // were waiting in poll().
505             if (Thread.interrupted()) {
506               throw new InterruptedException();
507             }
508           } finally {
509             lock.unlock();
510           }
511           doPoll0(interruptCheckPeriodMs, fdSet);
512         }
513       } catch (InterruptedException e) {
514         LOG.info(toString() + " terminating on InterruptedException");
515       } catch (Throwable e) {
516         LOG.error(toString() + " terminating on exception", e);
517       } finally {
518         lock.lock();
519         try {
520           kick(); // allow the handler for notificationSockets[0] to read a byte
521           for (Entry entry : entries.values()) {
522             // We do not remove from entries as we iterate, because that can
523             // cause a ConcurrentModificationException.
524             sendCallback("close", entries, fdSet, entry.getDomainSocket().fd);
525           }
526           entries.clear();
527           fdSet.close();
528         } finally {
529           lock.unlock();
530         }
531       }
532     }
533   });
534 
addNotificationSocket(final TreeMap<Integer, Entry> entries, FdSet fdSet)535   private void addNotificationSocket(final TreeMap<Integer, Entry> entries,
536       FdSet fdSet) {
537     entries.put(notificationSockets[1].fd,
538         new Entry(notificationSockets[1], new NotificationHandler()));
539     try {
540       notificationSockets[1].refCount.reference();
541     } catch (IOException e) {
542       throw new RuntimeException(e);
543     }
544     fdSet.add(notificationSockets[1].fd);
545     if (LOG.isTraceEnabled()) {
546       LOG.trace(this + ": adding notificationSocket " +
547           notificationSockets[1].fd + ", connected to " +
548           notificationSockets[0].fd);
549     }
550   }
551 
toString()552   public String toString() {
553     return "DomainSocketWatcher(" + System.identityHashCode(this) + ")";
554   }
555 
doPoll0(int maxWaitMs, FdSet readFds)556   private static native int doPoll0(int maxWaitMs, FdSet readFds)
557       throws IOException;
558 }
559