1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.net.unix; 19 20 import java.io.Closeable; 21 import java.io.EOFException; 22 23 import org.apache.hadoop.classification.InterfaceAudience; 24 import org.apache.hadoop.io.IOUtils; 25 26 import java.io.IOException; 27 import java.nio.channels.ClosedChannelException; 28 import java.util.Iterator; 29 import java.util.LinkedList; 30 import java.util.TreeMap; 31 import java.util.Map; 32 import java.util.concurrent.locks.Condition; 33 import java.util.concurrent.locks.ReentrantLock; 34 35 import org.apache.commons.lang.SystemUtils; 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 import org.apache.hadoop.util.NativeCodeLoader; 39 40 import com.google.common.annotations.VisibleForTesting; 41 import com.google.common.base.Preconditions; 42 import com.google.common.util.concurrent.Uninterruptibles; 43 44 /** 45 * The DomainSocketWatcher watches a set of domain sockets to see when they 46 * become readable, or closed. When one of those events happens, it makes a 47 * callback. 48 * 49 * See {@link DomainSocket} for more information about UNIX domain sockets. 50 */ 51 @InterfaceAudience.LimitedPrivate("HDFS") 52 public final class DomainSocketWatcher implements Closeable { 53 static { 54 if (SystemUtils.IS_OS_WINDOWS) { 55 loadingFailureReason = "UNIX Domain sockets are not available on Windows."; 56 } else if (!NativeCodeLoader.isNativeCodeLoaded()) { 57 loadingFailureReason = "libhadoop cannot be loaded."; 58 } else { 59 String problem; 60 try { anchorNative()61 anchorNative(); 62 problem = null; 63 } catch (Throwable t) { 64 problem = "DomainSocketWatcher#anchorNative got error: " + 65 t.getMessage(); 66 } 67 loadingFailureReason = problem; 68 } 69 } 70 71 static Log LOG = LogFactory.getLog(DomainSocketWatcher.class); 72 73 /** 74 * The reason why DomainSocketWatcher is not available, or null if it is 75 * available. 76 */ 77 private final static String loadingFailureReason; 78 79 /** 80 * Initializes the native library code. 81 */ anchorNative()82 private static native void anchorNative(); 83 getLoadingFailureReason()84 public static String getLoadingFailureReason() { 85 return loadingFailureReason; 86 } 87 88 public interface Handler { 89 /** 90 * Handles an event on a socket. An event may be the socket becoming 91 * readable, or the remote end being closed. 92 * 93 * @param sock The socket that the event occurred on. 94 * @return Whether we should close the socket. 95 */ handle(DomainSocket sock)96 boolean handle(DomainSocket sock); 97 } 98 99 /** 100 * Handler for {DomainSocketWatcher#notificationSockets[1]} 101 */ 102 private class NotificationHandler implements Handler { handle(DomainSocket sock)103 public boolean handle(DomainSocket sock) { 104 assert(lock.isHeldByCurrentThread()); 105 try { 106 kicked = false; 107 if (LOG.isTraceEnabled()) { 108 LOG.trace(this + ": NotificationHandler: doing a read on " + 109 sock.fd); 110 } 111 if (sock.getInputStream().read() == -1) { 112 if (LOG.isTraceEnabled()) { 113 LOG.trace(this + ": NotificationHandler: got EOF on " + sock.fd); 114 } 115 throw new EOFException(); 116 } 117 if (LOG.isTraceEnabled()) { 118 LOG.trace(this + ": NotificationHandler: read succeeded on " + 119 sock.fd); 120 } 121 return false; 122 } catch (IOException e) { 123 if (LOG.isTraceEnabled()) { 124 LOG.trace(this + ": NotificationHandler: setting closed to " + 125 "true for " + sock.fd); 126 } 127 closed = true; 128 return true; 129 } 130 } 131 } 132 133 private static class Entry { 134 final DomainSocket socket; 135 final Handler handler; 136 Entry(DomainSocket socket, Handler handler)137 Entry(DomainSocket socket, Handler handler) { 138 this.socket = socket; 139 this.handler = handler; 140 } 141 getDomainSocket()142 DomainSocket getDomainSocket() { 143 return socket; 144 } 145 getHandler()146 Handler getHandler() { 147 return handler; 148 } 149 } 150 151 /** 152 * The FdSet is a set of file descriptors that gets passed to poll(2). 153 * It contains a native memory segment, so that we don't have to copy 154 * in the poll0 function. 155 */ 156 private static class FdSet { 157 private long data; 158 alloc0()159 private native static long alloc0(); 160 FdSet()161 FdSet() { 162 data = alloc0(); 163 } 164 165 /** 166 * Add a file descriptor to the set. 167 * 168 * @param fd The file descriptor to add. 169 */ add(int fd)170 native void add(int fd); 171 172 /** 173 * Remove a file descriptor from the set. 174 * 175 * @param fd The file descriptor to remove. 176 */ remove(int fd)177 native void remove(int fd); 178 179 /** 180 * Get an array containing all the FDs marked as readable. 181 * Also clear the state of all FDs. 182 * 183 * @return An array containing all of the currently readable file 184 * descriptors. 185 */ getAndClearReadableFds()186 native int[] getAndClearReadableFds(); 187 188 /** 189 * Close the object and de-allocate the memory used. 190 */ close()191 native void close(); 192 } 193 194 /** 195 * Lock which protects toAdd, toRemove, and closed. 196 */ 197 private final ReentrantLock lock = new ReentrantLock(); 198 199 /** 200 * Condition variable which indicates that toAdd and toRemove have been 201 * processed. 202 */ 203 private final Condition processedCond = lock.newCondition(); 204 205 /** 206 * Entries to add. 207 */ 208 private final LinkedList<Entry> toAdd = 209 new LinkedList<Entry>(); 210 211 /** 212 * Entries to remove. 213 */ 214 private final TreeMap<Integer, DomainSocket> toRemove = 215 new TreeMap<Integer, DomainSocket>(); 216 217 /** 218 * Maximum length of time to go between checking whether the interrupted 219 * bit has been set for this thread. 220 */ 221 private final int interruptCheckPeriodMs; 222 223 /** 224 * A pair of sockets used to wake up the thread after it has called poll(2). 225 */ 226 private final DomainSocket notificationSockets[]; 227 228 /** 229 * Whether or not this DomainSocketWatcher is closed. 230 */ 231 private boolean closed = false; 232 233 /** 234 * True if we have written a byte to the notification socket. We should not 235 * write anything else to the socket until the notification handler has had a 236 * chance to run. Otherwise, our thread might block, causing deadlock. 237 * See HADOOP-11333 for details. 238 */ 239 private boolean kicked = false; 240 DomainSocketWatcher(int interruptCheckPeriodMs, String src)241 public DomainSocketWatcher(int interruptCheckPeriodMs, String src) 242 throws IOException { 243 if (loadingFailureReason != null) { 244 throw new UnsupportedOperationException(loadingFailureReason); 245 } 246 Preconditions.checkArgument(interruptCheckPeriodMs > 0); 247 this.interruptCheckPeriodMs = interruptCheckPeriodMs; 248 notificationSockets = DomainSocket.socketpair(); 249 watcherThread.setDaemon(true); 250 watcherThread.setName(src + " DomainSocketWatcher"); 251 watcherThread 252 .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 253 @Override 254 public void uncaughtException(Thread thread, Throwable t) { 255 LOG.error(thread + " terminating on unexpected exception", t); 256 } 257 }); 258 watcherThread.start(); 259 } 260 261 /** 262 * Close the DomainSocketWatcher and wait for its thread to terminate. 263 * 264 * If there is more than one close, all but the first will be ignored. 265 */ 266 @Override close()267 public void close() throws IOException { 268 lock.lock(); 269 try { 270 if (closed) return; 271 if (LOG.isDebugEnabled()) { 272 LOG.debug(this + ": closing"); 273 } 274 closed = true; 275 } finally { 276 lock.unlock(); 277 } 278 // Close notificationSockets[0], so that notificationSockets[1] gets an EOF 279 // event. This will wake up the thread immediately if it is blocked inside 280 // the select() system call. 281 notificationSockets[0].close(); 282 // Wait for the select thread to terminate. 283 Uninterruptibles.joinUninterruptibly(watcherThread); 284 } 285 286 @VisibleForTesting isClosed()287 public boolean isClosed() { 288 lock.lock(); 289 try { 290 return closed; 291 } finally { 292 lock.unlock(); 293 } 294 } 295 296 /** 297 * Add a socket. 298 * 299 * @param sock The socket to add. It is an error to re-add a socket that 300 * we are already watching. 301 * @param handler The handler to associate with this socket. This may be 302 * called any time after this function is called. 303 */ add(DomainSocket sock, Handler handler)304 public void add(DomainSocket sock, Handler handler) { 305 lock.lock(); 306 try { 307 if (closed) { 308 handler.handle(sock); 309 IOUtils.cleanup(LOG, sock); 310 return; 311 } 312 Entry entry = new Entry(sock, handler); 313 try { 314 sock.refCount.reference(); 315 } catch (ClosedChannelException e1) { 316 // If the socket is already closed before we add it, invoke the 317 // handler immediately. Then we're done. 318 handler.handle(sock); 319 return; 320 } 321 toAdd.add(entry); 322 kick(); 323 while (true) { 324 try { 325 processedCond.await(); 326 } catch (InterruptedException e) { 327 Thread.currentThread().interrupt(); 328 } 329 if (!toAdd.contains(entry)) { 330 break; 331 } 332 } 333 } finally { 334 lock.unlock(); 335 } 336 } 337 338 /** 339 * Remove a socket. Its handler will be called. 340 * 341 * @param sock The socket to remove. 342 */ remove(DomainSocket sock)343 public void remove(DomainSocket sock) { 344 lock.lock(); 345 try { 346 if (closed) return; 347 toRemove.put(sock.fd, sock); 348 kick(); 349 while (true) { 350 try { 351 processedCond.await(); 352 } catch (InterruptedException e) { 353 Thread.currentThread().interrupt(); 354 } 355 if (!toRemove.containsKey(sock.fd)) { 356 break; 357 } 358 } 359 } finally { 360 lock.unlock(); 361 } 362 } 363 364 /** 365 * Wake up the DomainSocketWatcher thread. 366 */ kick()367 private void kick() { 368 assert(lock.isHeldByCurrentThread()); 369 370 if (kicked) { 371 return; 372 } 373 374 try { 375 notificationSockets[0].getOutputStream().write(0); 376 kicked = true; 377 } catch (IOException e) { 378 if (!closed) { 379 LOG.error(this + ": error writing to notificationSockets[0]", e); 380 } 381 } 382 } 383 384 /** 385 * Send callback and return whether or not the domain socket was closed as a 386 * result of processing. 387 * 388 * @param caller reason for call 389 * @param entries mapping of file descriptor to entry 390 * @param fdSet set of file descriptors 391 * @param fd file descriptor 392 * @return true if the domain socket was closed as a result of processing 393 */ sendCallback(String caller, TreeMap<Integer, Entry> entries, FdSet fdSet, int fd)394 private boolean sendCallback(String caller, TreeMap<Integer, Entry> entries, 395 FdSet fdSet, int fd) { 396 if (LOG.isTraceEnabled()) { 397 LOG.trace(this + ": " + caller + " starting sendCallback for fd " + fd); 398 } 399 Entry entry = entries.get(fd); 400 Preconditions.checkNotNull(entry, 401 this + ": fdSet contained " + fd + ", which we were " + 402 "not tracking."); 403 DomainSocket sock = entry.getDomainSocket(); 404 if (entry.getHandler().handle(sock)) { 405 if (LOG.isTraceEnabled()) { 406 LOG.trace(this + ": " + caller + ": closing fd " + fd + 407 " at the request of the handler."); 408 } 409 if (toRemove.remove(fd) != null) { 410 if (LOG.isTraceEnabled()) { 411 LOG.trace(this + ": " + caller + " : sendCallback processed fd " + 412 fd + " in toRemove."); 413 } 414 } 415 try { 416 sock.refCount.unreferenceCheckClosed(); 417 } catch (IOException e) { 418 Preconditions.checkArgument(false, 419 this + ": file descriptor " + sock.fd + " was closed while " + 420 "still in the poll(2) loop."); 421 } 422 IOUtils.cleanup(LOG, sock); 423 fdSet.remove(fd); 424 return true; 425 } else { 426 if (LOG.isTraceEnabled()) { 427 LOG.trace(this + ": " + caller + ": sendCallback not " + 428 "closing fd " + fd); 429 } 430 return false; 431 } 432 } 433 434 /** 435 * Send callback, and if the domain socket was closed as a result of 436 * processing, then also remove the entry for the file descriptor. 437 * 438 * @param caller reason for call 439 * @param entries mapping of file descriptor to entry 440 * @param fdSet set of file descriptors 441 * @param fd file descriptor 442 */ sendCallbackAndRemove(String caller, TreeMap<Integer, Entry> entries, FdSet fdSet, int fd)443 private void sendCallbackAndRemove(String caller, 444 TreeMap<Integer, Entry> entries, FdSet fdSet, int fd) { 445 if (sendCallback(caller, entries, fdSet, fd)) { 446 entries.remove(fd); 447 } 448 } 449 450 @VisibleForTesting 451 final Thread watcherThread = new Thread(new Runnable() { 452 @Override 453 public void run() { 454 if (LOG.isDebugEnabled()) { 455 LOG.debug(this + ": starting with interruptCheckPeriodMs = " + 456 interruptCheckPeriodMs); 457 } 458 final TreeMap<Integer, Entry> entries = new TreeMap<Integer, Entry>(); 459 FdSet fdSet = new FdSet(); 460 addNotificationSocket(entries, fdSet); 461 try { 462 while (true) { 463 lock.lock(); 464 try { 465 for (int fd : fdSet.getAndClearReadableFds()) { 466 sendCallbackAndRemove("getAndClearReadableFds", entries, fdSet, 467 fd); 468 } 469 if (!(toAdd.isEmpty() && toRemove.isEmpty())) { 470 // Handle pending additions (before pending removes). 471 for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) { 472 Entry entry = iter.next(); 473 DomainSocket sock = entry.getDomainSocket(); 474 Entry prevEntry = entries.put(sock.fd, entry); 475 Preconditions.checkState(prevEntry == null, 476 this + ": tried to watch a file descriptor that we " + 477 "were already watching: " + sock); 478 if (LOG.isTraceEnabled()) { 479 LOG.trace(this + ": adding fd " + sock.fd); 480 } 481 fdSet.add(sock.fd); 482 iter.remove(); 483 } 484 // Handle pending removals 485 while (true) { 486 Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry(); 487 if (entry == null) break; 488 sendCallbackAndRemove("handlePendingRemovals", 489 entries, fdSet, entry.getValue().fd); 490 } 491 processedCond.signalAll(); 492 } 493 // Check if the thread should terminate. Doing this check now is 494 // easier than at the beginning of the loop, since we know toAdd and 495 // toRemove are now empty and processedCond has been notified if it 496 // needed to be. 497 if (closed) { 498 if (LOG.isDebugEnabled()) { 499 LOG.debug(toString() + " thread terminating."); 500 } 501 return; 502 } 503 // Check if someone sent our thread an InterruptedException while we 504 // were waiting in poll(). 505 if (Thread.interrupted()) { 506 throw new InterruptedException(); 507 } 508 } finally { 509 lock.unlock(); 510 } 511 doPoll0(interruptCheckPeriodMs, fdSet); 512 } 513 } catch (InterruptedException e) { 514 LOG.info(toString() + " terminating on InterruptedException"); 515 } catch (Throwable e) { 516 LOG.error(toString() + " terminating on exception", e); 517 } finally { 518 lock.lock(); 519 try { 520 kick(); // allow the handler for notificationSockets[0] to read a byte 521 for (Entry entry : entries.values()) { 522 // We do not remove from entries as we iterate, because that can 523 // cause a ConcurrentModificationException. 524 sendCallback("close", entries, fdSet, entry.getDomainSocket().fd); 525 } 526 entries.clear(); 527 fdSet.close(); 528 } finally { 529 lock.unlock(); 530 } 531 } 532 } 533 }); 534 addNotificationSocket(final TreeMap<Integer, Entry> entries, FdSet fdSet)535 private void addNotificationSocket(final TreeMap<Integer, Entry> entries, 536 FdSet fdSet) { 537 entries.put(notificationSockets[1].fd, 538 new Entry(notificationSockets[1], new NotificationHandler())); 539 try { 540 notificationSockets[1].refCount.reference(); 541 } catch (IOException e) { 542 throw new RuntimeException(e); 543 } 544 fdSet.add(notificationSockets[1].fd); 545 if (LOG.isTraceEnabled()) { 546 LOG.trace(this + ": adding notificationSocket " + 547 notificationSockets[1].fd + ", connected to " + 548 notificationSockets[0].fd); 549 } 550 } 551 toString()552 public String toString() { 553 return "DomainSocketWatcher(" + System.identityHashCode(this) + ")"; 554 } 555 doPoll0(int maxWaitMs, FdSet readFds)556 private static native int doPoll0(int maxWaitMs, FdSet readFds) 557 throws IOException; 558 } 559