1 /* 2 * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.net.InetAddress; 31 import java.net.InetSocketAddress; 32 import java.net.ProtocolFamily; 33 import java.net.Socket; 34 import java.net.SocketAddress; 35 import java.net.SocketOption; 36 import java.net.StandardProtocolFamily; 37 import java.net.StandardSocketOptions; 38 import java.nio.ByteBuffer; 39 import java.nio.channels.AlreadyBoundException; 40 import java.nio.channels.AlreadyConnectedException; 41 import java.nio.channels.AsynchronousCloseException; 42 import java.nio.channels.ClosedChannelException; 43 import java.nio.channels.ConnectionPendingException; 44 import java.nio.channels.NoConnectionPendingException; 45 import java.nio.channels.NotYetConnectedException; 46 import java.nio.channels.SelectionKey; 47 import java.nio.channels.SocketChannel; 48 import java.nio.channels.spi.SelectorProvider; 49 import java.util.Collections; 50 import java.util.HashSet; 51 import java.util.Objects; 52 import java.util.Set; 53 import java.util.concurrent.locks.ReentrantLock; 54 55 import sun.net.NetHooks; 56 import sun.net.ext.ExtendedSocketOptions; 57 import sun.net.util.SocketExceptions; 58 import static sun.net.ext.ExtendedSocketOptions.SOCK_STREAM; 59 60 /** 61 * An implementation of SocketChannels 62 */ 63 64 class SocketChannelImpl 65 extends SocketChannel 66 implements SelChImpl 67 { 68 // Used to make native read and write calls 69 private static NativeDispatcher nd; 70 71 // Our file descriptor object 72 private final FileDescriptor fd; 73 private final int fdVal; 74 75 // Lock held by current reading or connecting thread 76 private final ReentrantLock readLock = new ReentrantLock(); 77 78 // Lock held by current writing or connecting thread 79 private final ReentrantLock writeLock = new ReentrantLock(); 80 81 // Lock held by any thread that modifies the state fields declared below 82 // DO NOT invoke a blocking I/O operation while holding this lock! 83 private final Object stateLock = new Object(); 84 85 // Input/Output closed 86 private volatile boolean isInputClosed; 87 private volatile boolean isOutputClosed; 88 89 // -- The following fields are protected by stateLock 90 91 // set true when exclusive binding is on and SO_REUSEADDR is emulated 92 private boolean isReuseAddress; 93 94 // State, increases monotonically 95 private static final int ST_UNCONNECTED = 0; 96 private static final int ST_CONNECTIONPENDING = 1; 97 private static final int ST_CONNECTED = 2; 98 private static final int ST_CLOSING = 3; 99 private static final int ST_KILLPENDING = 4; 100 private static final int ST_KILLED = 5; 101 private volatile int state; // need stateLock to change 102 103 // IDs of native threads doing reads and writes, for signalling 104 private long readerThread; 105 private long writerThread; 106 107 // Binding 108 private InetSocketAddress localAddress; 109 private InetSocketAddress remoteAddress; 110 111 // Socket adaptor, created on demand 112 private Socket socket; 113 114 // -- End of fields protected by stateLock 115 116 117 // Constructor for normal connecting sockets 118 // SocketChannelImpl(SelectorProvider sp)119 SocketChannelImpl(SelectorProvider sp) throws IOException { 120 super(sp); 121 this.fd = Net.socket(true); 122 this.fdVal = IOUtil.fdVal(fd); 123 } 124 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)125 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) 126 throws IOException 127 { 128 super(sp); 129 this.fd = fd; 130 this.fdVal = IOUtil.fdVal(fd); 131 if (bound) { 132 synchronized (stateLock) { 133 this.localAddress = Net.localAddress(fd); 134 } 135 } 136 } 137 138 // Constructor for sockets obtained from server sockets 139 // SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)140 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa) 141 throws IOException 142 { 143 super(sp); 144 this.fd = fd; 145 this.fdVal = IOUtil.fdVal(fd); 146 synchronized (stateLock) { 147 this.localAddress = Net.localAddress(fd); 148 this.remoteAddress = isa; 149 this.state = ST_CONNECTED; 150 } 151 } 152 153 /** 154 * Checks that the channel is open. 155 * 156 * @throws ClosedChannelException if channel is closed (or closing) 157 */ ensureOpen()158 private void ensureOpen() throws ClosedChannelException { 159 if (!isOpen()) 160 throw new ClosedChannelException(); 161 } 162 163 /** 164 * Checks that the channel is open and connected. 165 * 166 * @apiNote This method uses the "state" field to check if the channel is 167 * open. It should never be used in conjuncion with isOpen or ensureOpen 168 * as these methods check AbstractInterruptibleChannel's closed field - that 169 * field is set before implCloseSelectableChannel is called and so before 170 * the state is changed. 171 * 172 * @throws ClosedChannelException if channel is closed (or closing) 173 * @throws NotYetConnectedException if open and not connected 174 */ ensureOpenAndConnected()175 private void ensureOpenAndConnected() throws ClosedChannelException { 176 int state = this.state; 177 if (state < ST_CONNECTED) { 178 throw new NotYetConnectedException(); 179 } else if (state > ST_CONNECTED) { 180 throw new ClosedChannelException(); 181 } 182 } 183 184 @Override socket()185 public Socket socket() { 186 synchronized (stateLock) { 187 if (socket == null) 188 socket = SocketAdaptor.create(this); 189 return socket; 190 } 191 } 192 193 @Override getLocalAddress()194 public SocketAddress getLocalAddress() throws IOException { 195 synchronized (stateLock) { 196 ensureOpen(); 197 return Net.getRevealedLocalAddress(localAddress); 198 } 199 } 200 201 @Override getRemoteAddress()202 public SocketAddress getRemoteAddress() throws IOException { 203 synchronized (stateLock) { 204 ensureOpen(); 205 return remoteAddress; 206 } 207 } 208 209 @Override setOption(SocketOption<T> name, T value)210 public <T> SocketChannel setOption(SocketOption<T> name, T value) 211 throws IOException 212 { 213 Objects.requireNonNull(name); 214 if (!supportedOptions().contains(name)) 215 throw new UnsupportedOperationException("'" + name + "' not supported"); 216 217 synchronized (stateLock) { 218 ensureOpen(); 219 220 if (name == StandardSocketOptions.IP_TOS) { 221 ProtocolFamily family = Net.isIPv6Available() ? 222 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; 223 Net.setSocketOption(fd, family, name, value); 224 return this; 225 } 226 227 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { 228 // SO_REUSEADDR emulated when using exclusive bind 229 isReuseAddress = (Boolean)value; 230 return this; 231 } 232 233 // no options that require special handling 234 Net.setSocketOption(fd, Net.UNSPEC, name, value); 235 return this; 236 } 237 } 238 239 @Override 240 @SuppressWarnings("unchecked") getOption(SocketOption<T> name)241 public <T> T getOption(SocketOption<T> name) 242 throws IOException 243 { 244 Objects.requireNonNull(name); 245 if (!supportedOptions().contains(name)) 246 throw new UnsupportedOperationException("'" + name + "' not supported"); 247 248 synchronized (stateLock) { 249 ensureOpen(); 250 251 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { 252 // SO_REUSEADDR emulated when using exclusive bind 253 return (T)Boolean.valueOf(isReuseAddress); 254 } 255 256 // special handling for IP_TOS: always return 0 when IPv6 257 if (name == StandardSocketOptions.IP_TOS) { 258 ProtocolFamily family = Net.isIPv6Available() ? 259 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; 260 return (T) Net.getSocketOption(fd, family, name); 261 } 262 263 // no options that require special handling 264 return (T) Net.getSocketOption(fd, Net.UNSPEC, name); 265 } 266 } 267 268 private static class DefaultOptionsHolder { 269 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 270 defaultOptions()271 private static Set<SocketOption<?>> defaultOptions() { 272 HashSet<SocketOption<?>> set = new HashSet<>(); 273 set.add(StandardSocketOptions.SO_SNDBUF); 274 set.add(StandardSocketOptions.SO_RCVBUF); 275 set.add(StandardSocketOptions.SO_KEEPALIVE); 276 set.add(StandardSocketOptions.SO_REUSEADDR); 277 if (Net.isReusePortAvailable()) { 278 set.add(StandardSocketOptions.SO_REUSEPORT); 279 } 280 set.add(StandardSocketOptions.SO_LINGER); 281 set.add(StandardSocketOptions.TCP_NODELAY); 282 // additional options required by socket adaptor 283 set.add(StandardSocketOptions.IP_TOS); 284 set.add(ExtendedSocketOption.SO_OOBINLINE); 285 set.addAll(ExtendedSocketOptions.options(SOCK_STREAM)); 286 return Collections.unmodifiableSet(set); 287 } 288 } 289 290 @Override supportedOptions()291 public final Set<SocketOption<?>> supportedOptions() { 292 return DefaultOptionsHolder.defaultOptions; 293 } 294 295 /** 296 * Marks the beginning of a read operation that might block. 297 * 298 * @throws ClosedChannelException if the channel is closed 299 * @throws NotYetConnectedException if the channel is not yet connected 300 */ beginRead(boolean blocking)301 private void beginRead(boolean blocking) throws ClosedChannelException { 302 if (blocking) { 303 // set hook for Thread.interrupt 304 begin(); 305 306 synchronized (stateLock) { 307 ensureOpenAndConnected(); 308 // record thread so it can be signalled if needed 309 readerThread = NativeThread.current(); 310 } 311 } else { 312 ensureOpenAndConnected(); 313 } 314 } 315 316 /** 317 * Marks the end of a read operation that may have blocked. 318 * 319 * @throws AsynchronousCloseException if the channel was closed due to this 320 * thread being interrupted on a blocking read operation. 321 */ endRead(boolean blocking, boolean completed)322 private void endRead(boolean blocking, boolean completed) 323 throws AsynchronousCloseException 324 { 325 if (blocking) { 326 synchronized (stateLock) { 327 readerThread = 0; 328 // notify any thread waiting in implCloseSelectableChannel 329 if (state == ST_CLOSING) { 330 stateLock.notifyAll(); 331 } 332 } 333 // remove hook for Thread.interrupt 334 end(completed); 335 } 336 } 337 338 @Override read(ByteBuffer buf)339 public int read(ByteBuffer buf) throws IOException { 340 Objects.requireNonNull(buf); 341 342 readLock.lock(); 343 try { 344 boolean blocking = isBlocking(); 345 int n = 0; 346 try { 347 beginRead(blocking); 348 349 // check if input is shutdown 350 if (isInputClosed) 351 return IOStatus.EOF; 352 353 if (blocking) { 354 do { 355 n = IOUtil.read(fd, buf, -1, nd); 356 } while (n == IOStatus.INTERRUPTED && isOpen()); 357 } else { 358 n = IOUtil.read(fd, buf, -1, nd); 359 } 360 } finally { 361 endRead(blocking, n > 0); 362 if (n <= 0 && isInputClosed) 363 return IOStatus.EOF; 364 } 365 return IOStatus.normalize(n); 366 } finally { 367 readLock.unlock(); 368 } 369 } 370 371 @Override read(ByteBuffer[] dsts, int offset, int length)372 public long read(ByteBuffer[] dsts, int offset, int length) 373 throws IOException 374 { 375 Objects.checkFromIndexSize(offset, length, dsts.length); 376 377 readLock.lock(); 378 try { 379 boolean blocking = isBlocking(); 380 long n = 0; 381 try { 382 beginRead(blocking); 383 384 // check if input is shutdown 385 if (isInputClosed) 386 return IOStatus.EOF; 387 388 if (blocking) { 389 do { 390 n = IOUtil.read(fd, dsts, offset, length, nd); 391 } while (n == IOStatus.INTERRUPTED && isOpen()); 392 } else { 393 n = IOUtil.read(fd, dsts, offset, length, nd); 394 } 395 } finally { 396 endRead(blocking, n > 0); 397 if (n <= 0 && isInputClosed) 398 return IOStatus.EOF; 399 } 400 return IOStatus.normalize(n); 401 } finally { 402 readLock.unlock(); 403 } 404 } 405 406 /** 407 * Marks the beginning of a write operation that might block. 408 * 409 * @throws ClosedChannelException if the channel is closed or output shutdown 410 * @throws NotYetConnectedException if the channel is not yet connected 411 */ beginWrite(boolean blocking)412 private void beginWrite(boolean blocking) throws ClosedChannelException { 413 if (blocking) { 414 // set hook for Thread.interrupt 415 begin(); 416 417 synchronized (stateLock) { 418 ensureOpenAndConnected(); 419 if (isOutputClosed) 420 throw new ClosedChannelException(); 421 // record thread so it can be signalled if needed 422 writerThread = NativeThread.current(); 423 } 424 } else { 425 ensureOpenAndConnected(); 426 } 427 } 428 429 /** 430 * Marks the end of a write operation that may have blocked. 431 * 432 * @throws AsynchronousCloseException if the channel was closed due to this 433 * thread being interrupted on a blocking write operation. 434 */ endWrite(boolean blocking, boolean completed)435 private void endWrite(boolean blocking, boolean completed) 436 throws AsynchronousCloseException 437 { 438 if (blocking) { 439 synchronized (stateLock) { 440 writerThread = 0; 441 // notify any thread waiting in implCloseSelectableChannel 442 if (state == ST_CLOSING) { 443 stateLock.notifyAll(); 444 } 445 } 446 // remove hook for Thread.interrupt 447 end(completed); 448 } 449 } 450 451 @Override write(ByteBuffer buf)452 public int write(ByteBuffer buf) throws IOException { 453 Objects.requireNonNull(buf); 454 455 writeLock.lock(); 456 try { 457 boolean blocking = isBlocking(); 458 int n = 0; 459 try { 460 beginWrite(blocking); 461 if (blocking) { 462 do { 463 n = IOUtil.write(fd, buf, -1, nd); 464 } while (n == IOStatus.INTERRUPTED && isOpen()); 465 } else { 466 n = IOUtil.write(fd, buf, -1, nd); 467 } 468 } finally { 469 endWrite(blocking, n > 0); 470 if (n <= 0 && isOutputClosed) 471 throw new AsynchronousCloseException(); 472 } 473 return IOStatus.normalize(n); 474 } finally { 475 writeLock.unlock(); 476 } 477 } 478 479 @Override write(ByteBuffer[] srcs, int offset, int length)480 public long write(ByteBuffer[] srcs, int offset, int length) 481 throws IOException 482 { 483 Objects.checkFromIndexSize(offset, length, srcs.length); 484 485 writeLock.lock(); 486 try { 487 boolean blocking = isBlocking(); 488 long n = 0; 489 try { 490 beginWrite(blocking); 491 if (blocking) { 492 do { 493 n = IOUtil.write(fd, srcs, offset, length, nd); 494 } while (n == IOStatus.INTERRUPTED && isOpen()); 495 } else { 496 n = IOUtil.write(fd, srcs, offset, length, nd); 497 } 498 } finally { 499 endWrite(blocking, n > 0); 500 if (n <= 0 && isOutputClosed) 501 throw new AsynchronousCloseException(); 502 } 503 return IOStatus.normalize(n); 504 } finally { 505 writeLock.unlock(); 506 } 507 } 508 509 /** 510 * Writes a byte of out of band data. 511 */ sendOutOfBandData(byte b)512 int sendOutOfBandData(byte b) throws IOException { 513 writeLock.lock(); 514 try { 515 boolean blocking = isBlocking(); 516 int n = 0; 517 try { 518 beginWrite(blocking); 519 if (blocking) { 520 do { 521 n = sendOutOfBandData(fd, b); 522 } while (n == IOStatus.INTERRUPTED && isOpen()); 523 } else { 524 n = sendOutOfBandData(fd, b); 525 } 526 } finally { 527 endWrite(blocking, n > 0); 528 if (n <= 0 && isOutputClosed) 529 throw new AsynchronousCloseException(); 530 } 531 return IOStatus.normalize(n); 532 } finally { 533 writeLock.unlock(); 534 } 535 } 536 537 @Override implConfigureBlocking(boolean block)538 protected void implConfigureBlocking(boolean block) throws IOException { 539 readLock.lock(); 540 try { 541 writeLock.lock(); 542 try { 543 synchronized (stateLock) { 544 ensureOpen(); 545 IOUtil.configureBlocking(fd, block); 546 } 547 } finally { 548 writeLock.unlock(); 549 } 550 } finally { 551 readLock.unlock(); 552 } 553 } 554 555 /** 556 * Returns the local address, or null if not bound 557 */ localAddress()558 InetSocketAddress localAddress() { 559 synchronized (stateLock) { 560 return localAddress; 561 } 562 } 563 564 /** 565 * Returns the remote address, or null if not connected 566 */ remoteAddress()567 InetSocketAddress remoteAddress() { 568 synchronized (stateLock) { 569 return remoteAddress; 570 } 571 } 572 573 @Override bind(SocketAddress local)574 public SocketChannel bind(SocketAddress local) throws IOException { 575 readLock.lock(); 576 try { 577 writeLock.lock(); 578 try { 579 synchronized (stateLock) { 580 ensureOpen(); 581 if (state == ST_CONNECTIONPENDING) 582 throw new ConnectionPendingException(); 583 if (localAddress != null) 584 throw new AlreadyBoundException(); 585 InetSocketAddress isa = (local == null) ? 586 new InetSocketAddress(0) : Net.checkAddress(local); 587 SecurityManager sm = System.getSecurityManager(); 588 if (sm != null) { 589 sm.checkListen(isa.getPort()); 590 } 591 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); 592 Net.bind(fd, isa.getAddress(), isa.getPort()); 593 localAddress = Net.localAddress(fd); 594 } 595 } finally { 596 writeLock.unlock(); 597 } 598 } finally { 599 readLock.unlock(); 600 } 601 return this; 602 } 603 604 @Override isConnected()605 public boolean isConnected() { 606 return (state == ST_CONNECTED); 607 } 608 609 @Override isConnectionPending()610 public boolean isConnectionPending() { 611 return (state == ST_CONNECTIONPENDING); 612 } 613 614 /** 615 * Marks the beginning of a connect operation that might block. 616 * @param blocking true if configured blocking 617 * @param isa the remote address 618 * @throws ClosedChannelException if the channel is closed 619 * @throws AlreadyConnectedException if already connected 620 * @throws ConnectionPendingException is a connection is pending 621 * @throws IOException if the pre-connect hook fails 622 */ beginConnect(boolean blocking, InetSocketAddress isa)623 private void beginConnect(boolean blocking, InetSocketAddress isa) 624 throws IOException 625 { 626 if (blocking) { 627 // set hook for Thread.interrupt 628 begin(); 629 } 630 synchronized (stateLock) { 631 ensureOpen(); 632 int state = this.state; 633 if (state == ST_CONNECTED) 634 throw new AlreadyConnectedException(); 635 if (state == ST_CONNECTIONPENDING) 636 throw new ConnectionPendingException(); 637 assert state == ST_UNCONNECTED; 638 this.state = ST_CONNECTIONPENDING; 639 640 if (localAddress == null) 641 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); 642 remoteAddress = isa; 643 644 if (blocking) { 645 // record thread so it can be signalled if needed 646 readerThread = NativeThread.current(); 647 } 648 } 649 } 650 651 /** 652 * Marks the end of a connect operation that may have blocked. 653 * 654 * @throws AsynchronousCloseException if the channel was closed due to this 655 * thread being interrupted on a blocking connect operation. 656 * @throws IOException if completed and unable to obtain the local address 657 */ endConnect(boolean blocking, boolean completed)658 private void endConnect(boolean blocking, boolean completed) 659 throws IOException 660 { 661 endRead(blocking, completed); 662 663 if (completed) { 664 synchronized (stateLock) { 665 if (state == ST_CONNECTIONPENDING) { 666 localAddress = Net.localAddress(fd); 667 state = ST_CONNECTED; 668 } 669 } 670 } 671 } 672 673 @Override connect(SocketAddress sa)674 public boolean connect(SocketAddress sa) throws IOException { 675 InetSocketAddress isa = Net.checkAddress(sa); 676 SecurityManager sm = System.getSecurityManager(); 677 if (sm != null) 678 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 679 680 InetAddress ia = isa.getAddress(); 681 if (ia.isAnyLocalAddress()) 682 ia = InetAddress.getLocalHost(); 683 684 try { 685 readLock.lock(); 686 try { 687 writeLock.lock(); 688 try { 689 int n = 0; 690 boolean blocking = isBlocking(); 691 try { 692 beginConnect(blocking, isa); 693 do { 694 n = Net.connect(fd, ia, isa.getPort()); 695 } while (n == IOStatus.INTERRUPTED && isOpen()); 696 } finally { 697 endConnect(blocking, (n > 0)); 698 } 699 assert IOStatus.check(n); 700 return n > 0; 701 } finally { 702 writeLock.unlock(); 703 } 704 } finally { 705 readLock.unlock(); 706 } 707 } catch (IOException ioe) { 708 // connect failed, close the channel 709 close(); 710 throw SocketExceptions.of(ioe, isa); 711 } 712 } 713 714 /** 715 * Marks the beginning of a finishConnect operation that might block. 716 * 717 * @throws ClosedChannelException if the channel is closed 718 * @throws NoConnectionPendingException if no connection is pending 719 */ beginFinishConnect(boolean blocking)720 private void beginFinishConnect(boolean blocking) throws ClosedChannelException { 721 if (blocking) { 722 // set hook for Thread.interrupt 723 begin(); 724 } 725 synchronized (stateLock) { 726 ensureOpen(); 727 if (state != ST_CONNECTIONPENDING) 728 throw new NoConnectionPendingException(); 729 if (blocking) { 730 // record thread so it can be signalled if needed 731 readerThread = NativeThread.current(); 732 } 733 } 734 } 735 736 /** 737 * Marks the end of a finishConnect operation that may have blocked. 738 * 739 * @throws AsynchronousCloseException if the channel was closed due to this 740 * thread being interrupted on a blocking connect operation. 741 * @throws IOException if completed and unable to obtain the local address 742 */ endFinishConnect(boolean blocking, boolean completed)743 private void endFinishConnect(boolean blocking, boolean completed) 744 throws IOException 745 { 746 endRead(blocking, completed); 747 748 if (completed) { 749 synchronized (stateLock) { 750 if (state == ST_CONNECTIONPENDING) { 751 localAddress = Net.localAddress(fd); 752 state = ST_CONNECTED; 753 } 754 } 755 } 756 } 757 758 @Override finishConnect()759 public boolean finishConnect() throws IOException { 760 try { 761 readLock.lock(); 762 try { 763 writeLock.lock(); 764 try { 765 // no-op if already connected 766 if (isConnected()) 767 return true; 768 769 boolean blocking = isBlocking(); 770 boolean connected = false; 771 try { 772 beginFinishConnect(blocking); 773 int n = 0; 774 if (blocking) { 775 do { 776 n = checkConnect(fd, true); 777 } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen()); 778 } else { 779 n = checkConnect(fd, false); 780 } 781 connected = (n > 0); 782 } finally { 783 endFinishConnect(blocking, connected); 784 } 785 assert (blocking && connected) ^ !blocking; 786 return connected; 787 } finally { 788 writeLock.unlock(); 789 } 790 } finally { 791 readLock.unlock(); 792 } 793 } catch (IOException ioe) { 794 // connect failed, close the channel 795 close(); 796 throw SocketExceptions.of(ioe, remoteAddress); 797 } 798 } 799 800 /** 801 * Invoked by implCloseChannel to close the channel. 802 * 803 * This method waits for outstanding I/O operations to complete. When in 804 * blocking mode, the socket is pre-closed and the threads in blocking I/O 805 * operations are signalled to ensure that the outstanding I/O operations 806 * complete quickly. 807 * 808 * If the socket is connected then it is shutdown by this method. The 809 * shutdown ensures that the peer reads EOF for the case that the socket is 810 * not pre-closed or closed by this method. 811 * 812 * The socket is closed by this method when it is not registered with a 813 * Selector. Note that a channel configured blocking may be registered with 814 * a Selector. This arises when a key is canceled and the channel configured 815 * to blocking mode before the key is flushed from the Selector. 816 */ 817 @Override implCloseSelectableChannel()818 protected void implCloseSelectableChannel() throws IOException { 819 assert !isOpen(); 820 821 boolean blocking; 822 boolean connected; 823 boolean interrupted = false; 824 825 // set state to ST_CLOSING 826 synchronized (stateLock) { 827 assert state < ST_CLOSING; 828 blocking = isBlocking(); 829 connected = (state == ST_CONNECTED); 830 state = ST_CLOSING; 831 } 832 833 // wait for any outstanding I/O operations to complete 834 if (blocking) { 835 synchronized (stateLock) { 836 assert state == ST_CLOSING; 837 long reader = readerThread; 838 long writer = writerThread; 839 if (reader != 0 || writer != 0) { 840 nd.preClose(fd); 841 connected = false; // fd is no longer connected socket 842 843 if (reader != 0) 844 NativeThread.signal(reader); 845 if (writer != 0) 846 NativeThread.signal(writer); 847 848 // wait for blocking I/O operations to end 849 while (readerThread != 0 || writerThread != 0) { 850 try { 851 stateLock.wait(); 852 } catch (InterruptedException e) { 853 interrupted = true; 854 } 855 } 856 } 857 } 858 } else { 859 // non-blocking mode: wait for read/write to complete 860 readLock.lock(); 861 try { 862 writeLock.lock(); 863 writeLock.unlock(); 864 } finally { 865 readLock.unlock(); 866 } 867 } 868 869 // set state to ST_KILLPENDING 870 synchronized (stateLock) { 871 assert state == ST_CLOSING; 872 // if connected and the channel is registered with a Selector then 873 // shutdown the output if possible so that the peer reads EOF. If 874 // SO_LINGER is enabled and set to a non-zero value then it needs to 875 // be disabled so that the Selector does not wait when it closes 876 // the socket. 877 if (connected && isRegistered()) { 878 try { 879 SocketOption<Integer> opt = StandardSocketOptions.SO_LINGER; 880 int interval = (int) Net.getSocketOption(fd, Net.UNSPEC, opt); 881 if (interval != 0) { 882 if (interval > 0) { 883 // disable SO_LINGER 884 Net.setSocketOption(fd, Net.UNSPEC, opt, -1); 885 } 886 Net.shutdown(fd, Net.SHUT_WR); 887 } 888 } catch (IOException ignore) { } 889 } 890 state = ST_KILLPENDING; 891 } 892 893 // close socket if not registered with Selector 894 if (!isRegistered()) 895 kill(); 896 897 // restore interrupt status 898 if (interrupted) 899 Thread.currentThread().interrupt(); 900 } 901 902 @Override 903 public void kill() throws IOException { 904 synchronized (stateLock) { 905 if (state == ST_KILLPENDING) { 906 state = ST_KILLED; 907 nd.close(fd); 908 } 909 } 910 } 911 912 @Override 913 public SocketChannel shutdownInput() throws IOException { 914 synchronized (stateLock) { 915 ensureOpen(); 916 if (!isConnected()) 917 throw new NotYetConnectedException(); 918 if (!isInputClosed) { 919 Net.shutdown(fd, Net.SHUT_RD); 920 long thread = readerThread; 921 if (thread != 0) 922 NativeThread.signal(thread); 923 isInputClosed = true; 924 } 925 return this; 926 } 927 } 928 929 @Override 930 public SocketChannel shutdownOutput() throws IOException { 931 synchronized (stateLock) { 932 ensureOpen(); 933 if (!isConnected()) 934 throw new NotYetConnectedException(); 935 if (!isOutputClosed) { 936 Net.shutdown(fd, Net.SHUT_WR); 937 long thread = writerThread; 938 if (thread != 0) 939 NativeThread.signal(thread); 940 isOutputClosed = true; 941 } 942 return this; 943 } 944 } 945 946 boolean isInputOpen() { 947 return !isInputClosed; 948 } 949 950 boolean isOutputOpen() { 951 return !isOutputClosed; 952 } 953 954 /** 955 * Poll this channel's socket for reading up to the given timeout. 956 * @return {@code true} if the socket is polled 957 */ 958 boolean pollRead(long timeout) throws IOException { 959 boolean blocking = isBlocking(); 960 assert Thread.holdsLock(blockingLock()) && blocking; 961 962 readLock.lock(); 963 try { 964 boolean polled = false; 965 try { 966 beginRead(blocking); 967 int events = Net.poll(fd, Net.POLLIN, timeout); 968 polled = (events != 0); 969 } finally { 970 endRead(blocking, polled); 971 } 972 return polled; 973 } finally { 974 readLock.unlock(); 975 } 976 } 977 978 /** 979 * Poll this channel's socket for a connection, up to the given timeout. 980 * @return {@code true} if the socket is polled 981 */ 982 boolean pollConnected(long timeout) throws IOException { 983 boolean blocking = isBlocking(); 984 assert Thread.holdsLock(blockingLock()) && blocking; 985 986 readLock.lock(); 987 try { 988 writeLock.lock(); 989 try { 990 boolean polled = false; 991 try { 992 beginFinishConnect(blocking); 993 int events = Net.poll(fd, Net.POLLCONN, timeout); 994 polled = (events != 0); 995 } finally { 996 // invoke endFinishConnect with completed = false so that 997 // the state is not changed to ST_CONNECTED. The socket 998 // adaptor will use finishConnect to finish. 999 endFinishConnect(blocking, /*completed*/false); 1000 } 1001 return polled; 1002 } finally { 1003 writeLock.unlock(); 1004 } 1005 } finally { 1006 readLock.unlock(); 1007 } 1008 } 1009 1010 /** 1011 * Translates native poll revent ops into a ready operation ops 1012 */ 1013 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 1014 int intOps = ski.nioInterestOps(); 1015 int oldOps = ski.nioReadyOps(); 1016 int newOps = initialOps; 1017 1018 if ((ops & Net.POLLNVAL) != 0) { 1019 // This should only happen if this channel is pre-closed while a 1020 // selection operation is in progress 1021 // ## Throw an error if this channel has not been pre-closed 1022 return false; 1023 } 1024 1025 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 1026 newOps = intOps; 1027 ski.nioReadyOps(newOps); 1028 return (newOps & ~oldOps) != 0; 1029 } 1030 1031 boolean connected = isConnected(); 1032 if (((ops & Net.POLLIN) != 0) && 1033 ((intOps & SelectionKey.OP_READ) != 0) && connected) 1034 newOps |= SelectionKey.OP_READ; 1035 1036 if (((ops & Net.POLLCONN) != 0) && 1037 ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending()) 1038 newOps |= SelectionKey.OP_CONNECT; 1039 1040 if (((ops & Net.POLLOUT) != 0) && 1041 ((intOps & SelectionKey.OP_WRITE) != 0) && connected) 1042 newOps |= SelectionKey.OP_WRITE; 1043 1044 ski.nioReadyOps(newOps); 1045 return (newOps & ~oldOps) != 0; 1046 } 1047 1048 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 1049 return translateReadyOps(ops, ski.nioReadyOps(), ski); 1050 } 1051 1052 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 1053 return translateReadyOps(ops, 0, ski); 1054 } 1055 1056 /** 1057 * Translates an interest operation set into a native poll event set 1058 */ 1059 public int translateInterestOps(int ops) { 1060 int newOps = 0; 1061 if ((ops & SelectionKey.OP_READ) != 0) 1062 newOps |= Net.POLLIN; 1063 if ((ops & SelectionKey.OP_WRITE) != 0) 1064 newOps |= Net.POLLOUT; 1065 if ((ops & SelectionKey.OP_CONNECT) != 0) 1066 newOps |= Net.POLLCONN; 1067 return newOps; 1068 } 1069 1070 public FileDescriptor getFD() { 1071 return fd; 1072 } 1073 1074 public int getFDVal() { 1075 return fdVal; 1076 } 1077 1078 @Override 1079 public String toString() { 1080 StringBuilder sb = new StringBuilder(); 1081 sb.append(this.getClass().getSuperclass().getName()); 1082 sb.append('['); 1083 if (!isOpen()) 1084 sb.append("closed"); 1085 else { 1086 synchronized (stateLock) { 1087 switch (state) { 1088 case ST_UNCONNECTED: 1089 sb.append("unconnected"); 1090 break; 1091 case ST_CONNECTIONPENDING: 1092 sb.append("connection-pending"); 1093 break; 1094 case ST_CONNECTED: 1095 sb.append("connected"); 1096 if (isInputClosed) 1097 sb.append(" ishut"); 1098 if (isOutputClosed) 1099 sb.append(" oshut"); 1100 break; 1101 } 1102 InetSocketAddress addr = localAddress(); 1103 if (addr != null) { 1104 sb.append(" local="); 1105 sb.append(Net.getRevealedLocalAddressAsString(addr)); 1106 } 1107 if (remoteAddress() != null) { 1108 sb.append(" remote="); 1109 sb.append(remoteAddress().toString()); 1110 } 1111 } 1112 } 1113 sb.append(']'); 1114 return sb.toString(); 1115 } 1116 1117 1118 // -- Native methods -- 1119 1120 private static native int checkConnect(FileDescriptor fd, boolean block) 1121 throws IOException; 1122 1123 private static native int sendOutOfBandData(FileDescriptor fd, byte data) 1124 throws IOException; 1125 1126 static { 1127 IOUtil.load(); 1128 nd = new SocketDispatcher(); 1129 } 1130 1131 } 1132