1 /* 2 * Copyright (c) 2009, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch.sctp; 26 27 import java.net.InetAddress; 28 import java.net.SocketAddress; 29 import java.net.SocketException; 30 import java.net.InetSocketAddress; 31 import java.io.FileDescriptor; 32 import java.io.IOException; 33 import java.util.Collections; 34 import java.util.Set; 35 import java.util.HashSet; 36 import java.nio.ByteBuffer; 37 import java.nio.channels.SelectionKey; 38 import java.nio.channels.ClosedChannelException; 39 import java.nio.channels.ConnectionPendingException; 40 import java.nio.channels.NoConnectionPendingException; 41 import java.nio.channels.AlreadyConnectedException; 42 import java.nio.channels.NotYetBoundException; 43 import java.nio.channels.NotYetConnectedException; 44 import java.nio.channels.spi.SelectorProvider; 45 import com.sun.nio.sctp.AbstractNotificationHandler; 46 import com.sun.nio.sctp.Association; 47 import com.sun.nio.sctp.AssociationChangeNotification; 48 import com.sun.nio.sctp.HandlerResult; 49 import com.sun.nio.sctp.IllegalReceiveException; 50 import com.sun.nio.sctp.InvalidStreamException; 51 import com.sun.nio.sctp.IllegalUnbindException; 52 import com.sun.nio.sctp.MessageInfo; 53 import com.sun.nio.sctp.NotificationHandler; 54 import com.sun.nio.sctp.SctpChannel; 55 import com.sun.nio.sctp.SctpSocketOption; 56 import sun.nio.ch.DirectBuffer; 57 import sun.nio.ch.IOStatus; 58 import sun.nio.ch.IOUtil; 59 import sun.nio.ch.NativeThread; 60 import sun.nio.ch.Net; 61 import sun.nio.ch.SelChImpl; 62 import sun.nio.ch.SelectionKeyImpl; 63 import sun.nio.ch.Util; 64 import static com.sun.nio.sctp.SctpStandardSocketOptions.*; 65 import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED; 66 import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED; 67 import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED; 68 import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN; 69 70 /** 71 * An implementation of an SctpChannel 72 */ 73 public class SctpChannelImpl extends SctpChannel 74 implements SelChImpl 75 { 76 private final FileDescriptor fd; 77 78 private final int fdVal; 79 80 /* IDs of native threads doing send and receivess, for signalling */ 81 private volatile long receiverThread = 0; 82 private volatile long senderThread = 0; 83 84 /* Lock held by current receiving or connecting thread */ 85 private final Object receiveLock = new Object(); 86 87 /* Lock held by current sending or connecting thread */ 88 private final Object sendLock = new Object(); 89 90 private final ThreadLocal<Boolean> receiveInvoked = 91 new ThreadLocal<Boolean>() { 92 @Override protected Boolean initialValue() { 93 return Boolean.FALSE; 94 } 95 }; 96 97 /* Lock held by any thread that modifies the state fields declared below 98 DO NOT invoke a blocking I/O operation while holding this lock! */ 99 private final Object stateLock = new Object(); 100 101 private enum ChannelState { 102 UNINITIALIZED, 103 UNCONNECTED, 104 PENDING, 105 CONNECTED, 106 KILLPENDING, 107 KILLED, 108 } 109 /* -- The following fields are protected by stateLock -- */ 110 private ChannelState state = ChannelState.UNINITIALIZED; 111 112 /* Binding; Once bound the port will remain constant. */ 113 int port = -1; 114 private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>(); 115 /* Has the channel been bound to the wildcard address */ 116 private boolean wildcard; /* false */ 117 //private InetSocketAddress remoteAddress = null; 118 119 /* Input/Output open */ 120 private boolean readyToConnect; 121 122 /* Shutdown */ 123 private boolean isShutdown; 124 125 private Association association; 126 127 private Set<SocketAddress> remoteAddresses = Collections.emptySet(); 128 129 /* -- End of fields protected by stateLock -- */ 130 131 /** 132 * Constructor for normal connecting sockets 133 */ SctpChannelImpl(SelectorProvider provider)134 public SctpChannelImpl(SelectorProvider provider) throws IOException { 135 //TODO: update provider remove public modifier 136 super(provider); 137 this.fd = SctpNet.socket(true); 138 this.fdVal = IOUtil.fdVal(fd); 139 this.state = ChannelState.UNCONNECTED; 140 } 141 142 /** 143 * Constructor for sockets obtained from server sockets 144 */ SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)145 public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd) 146 throws IOException { 147 this(provider, fd, null); 148 } 149 150 /** 151 * Constructor for sockets obtained from branching 152 */ SctpChannelImpl(SelectorProvider provider, FileDescriptor fd, Association association)153 public SctpChannelImpl(SelectorProvider provider, 154 FileDescriptor fd, 155 Association association) 156 throws IOException { 157 super(provider); 158 this.fd = fd; 159 this.fdVal = IOUtil.fdVal(fd); 160 this.state = ChannelState.CONNECTED; 161 port = (Net.localAddress(fd)).getPort(); 162 163 if (association != null) { /* branched */ 164 this.association = association; 165 } else { /* obtained from server channel */ 166 /* Receive COMM_UP */ 167 ByteBuffer buf = Util.getTemporaryDirectBuffer(50); 168 try { 169 receive(buf, null, null, true); 170 } finally { 171 Util.releaseTemporaryDirectBuffer(buf); 172 } 173 } 174 } 175 176 /** 177 * Binds the channel's socket to a local address. 178 */ 179 @Override bind(SocketAddress local)180 public SctpChannel bind(SocketAddress local) throws IOException { 181 synchronized (receiveLock) { 182 synchronized (sendLock) { 183 synchronized (stateLock) { 184 ensureOpenAndUnconnected(); 185 if (isBound()) 186 SctpNet.throwAlreadyBoundException(); 187 InetSocketAddress isa = (local == null) ? 188 new InetSocketAddress(0) : Net.checkAddress(local); 189 SecurityManager sm = System.getSecurityManager(); 190 if (sm != null) { 191 sm.checkListen(isa.getPort()); 192 } 193 Net.bind(fd, isa.getAddress(), isa.getPort()); 194 InetSocketAddress boundIsa = Net.localAddress(fd); 195 port = boundIsa.getPort(); 196 localAddresses.add(isa); 197 if (isa.getAddress().isAnyLocalAddress()) 198 wildcard = true; 199 } 200 } 201 } 202 return this; 203 } 204 205 @Override bindAddress(InetAddress address)206 public SctpChannel bindAddress(InetAddress address) 207 throws IOException { 208 bindUnbindAddress(address, true); 209 localAddresses.add(new InetSocketAddress(address, port)); 210 return this; 211 } 212 213 @Override unbindAddress(InetAddress address)214 public SctpChannel unbindAddress(InetAddress address) 215 throws IOException { 216 bindUnbindAddress(address, false); 217 localAddresses.remove(new InetSocketAddress(address, port)); 218 return this; 219 } 220 bindUnbindAddress(InetAddress address, boolean add)221 private SctpChannel bindUnbindAddress(InetAddress address, boolean add) 222 throws IOException { 223 if (address == null) 224 throw new IllegalArgumentException(); 225 226 synchronized (receiveLock) { 227 synchronized (sendLock) { 228 synchronized (stateLock) { 229 if (!isOpen()) 230 throw new ClosedChannelException(); 231 if (!isBound()) 232 throw new NotYetBoundException(); 233 if (wildcard) 234 throw new IllegalStateException( 235 "Cannot add or remove addresses from a channel that is bound to the wildcard address"); 236 if (address.isAnyLocalAddress()) 237 throw new IllegalArgumentException( 238 "Cannot add or remove the wildcard address"); 239 if (add) { 240 for (InetSocketAddress addr : localAddresses) { 241 if (addr.getAddress().equals(address)) { 242 SctpNet.throwAlreadyBoundException(); 243 } 244 } 245 } else { /*removing */ 246 /* Verify that there is more than one address 247 * and that address is already bound */ 248 if (localAddresses.size() <= 1) 249 throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound"); 250 boolean foundAddress = false; 251 for (InetSocketAddress addr : localAddresses) { 252 if (addr.getAddress().equals(address)) { 253 foundAddress = true; 254 break; 255 } 256 } 257 if (!foundAddress ) 258 throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address"); 259 } 260 261 SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add); 262 263 /* Update our internal Set to reflect the addition/removal */ 264 if (add) 265 localAddresses.add(new InetSocketAddress(address, port)); 266 else { 267 for (InetSocketAddress addr : localAddresses) { 268 if (addr.getAddress().equals(address)) { 269 localAddresses.remove(addr); 270 break; 271 } 272 } 273 } 274 } 275 } 276 } 277 return this; 278 } 279 isBound()280 private boolean isBound() { 281 synchronized (stateLock) { 282 return port == -1 ? false : true; 283 } 284 } 285 isConnected()286 private boolean isConnected() { 287 synchronized (stateLock) { 288 return (state == ChannelState.CONNECTED); 289 } 290 } 291 ensureOpenAndUnconnected()292 private void ensureOpenAndUnconnected() throws IOException { 293 synchronized (stateLock) { 294 if (!isOpen()) 295 throw new ClosedChannelException(); 296 if (isConnected()) 297 throw new AlreadyConnectedException(); 298 if (state == ChannelState.PENDING) 299 throw new ConnectionPendingException(); 300 } 301 } 302 ensureReceiveOpen()303 private boolean ensureReceiveOpen() throws ClosedChannelException { 304 synchronized (stateLock) { 305 if (!isOpen()) 306 throw new ClosedChannelException(); 307 if (!isConnected()) 308 throw new NotYetConnectedException(); 309 else 310 return true; 311 } 312 } 313 ensureSendOpen()314 private void ensureSendOpen() throws ClosedChannelException { 315 synchronized (stateLock) { 316 if (!isOpen()) 317 throw new ClosedChannelException(); 318 if (isShutdown) 319 throw new ClosedChannelException(); 320 if (!isConnected()) 321 throw new NotYetConnectedException(); 322 } 323 } 324 receiverCleanup()325 private void receiverCleanup() throws IOException { 326 synchronized (stateLock) { 327 receiverThread = 0; 328 if (state == ChannelState.KILLPENDING) 329 kill(); 330 } 331 } 332 senderCleanup()333 private void senderCleanup() throws IOException { 334 synchronized (stateLock) { 335 senderThread = 0; 336 if (state == ChannelState.KILLPENDING) 337 kill(); 338 } 339 } 340 341 @Override association()342 public Association association() throws ClosedChannelException { 343 synchronized (stateLock) { 344 if (!isOpen()) 345 throw new ClosedChannelException(); 346 if (!isConnected()) 347 return null; 348 349 return association; 350 } 351 } 352 353 @Override connect(SocketAddress endpoint)354 public boolean connect(SocketAddress endpoint) throws IOException { 355 synchronized (receiveLock) { 356 synchronized (sendLock) { 357 ensureOpenAndUnconnected(); 358 InetSocketAddress isa = Net.checkAddress(endpoint); 359 SecurityManager sm = System.getSecurityManager(); 360 if (sm != null) 361 sm.checkConnect(isa.getAddress().getHostAddress(), 362 isa.getPort()); 363 synchronized (blockingLock()) { 364 int n = 0; 365 try { 366 try { 367 begin(); 368 synchronized (stateLock) { 369 if (!isOpen()) { 370 return false; 371 } 372 receiverThread = NativeThread.current(); 373 } 374 for (;;) { 375 InetAddress ia = isa.getAddress(); 376 if (ia.isAnyLocalAddress()) 377 ia = InetAddress.getLocalHost(); 378 n = SctpNet.connect(fdVal, ia, isa.getPort()); 379 if ( (n == IOStatus.INTERRUPTED) 380 && isOpen()) 381 continue; 382 break; 383 } 384 } finally { 385 receiverCleanup(); 386 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 387 assert IOStatus.check(n); 388 } 389 } catch (IOException x) { 390 /* If an exception was thrown, close the channel after 391 * invoking end() so as to avoid bogus 392 * AsynchronousCloseExceptions */ 393 close(); 394 throw x; 395 } 396 397 if (n > 0) { 398 synchronized (stateLock) { 399 /* Connection succeeded */ 400 state = ChannelState.CONNECTED; 401 if (!isBound()) { 402 InetSocketAddress boundIsa = 403 Net.localAddress(fd); 404 port = boundIsa.getPort(); 405 } 406 407 /* Receive COMM_UP */ 408 ByteBuffer buf = Util.getTemporaryDirectBuffer(50); 409 try { 410 receive(buf, null, null, true); 411 } finally { 412 Util.releaseTemporaryDirectBuffer(buf); 413 } 414 415 /* cache remote addresses */ 416 try { 417 remoteAddresses = getRemoteAddresses(); 418 } catch (IOException unused) { /* swallow exception */ } 419 420 return true; 421 } 422 } else { 423 synchronized (stateLock) { 424 /* If nonblocking and no exception then connection 425 * pending; disallow another invocation */ 426 if (!isBlocking()) 427 state = ChannelState.PENDING; 428 else 429 assert false; 430 } 431 } 432 } 433 return false; 434 } 435 } 436 } 437 438 @Override connect(SocketAddress endpoint, int maxOutStreams, int maxInStreams)439 public boolean connect(SocketAddress endpoint, 440 int maxOutStreams, 441 int maxInStreams) 442 throws IOException { 443 ensureOpenAndUnconnected(); 444 return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams. 445 create(maxInStreams, maxOutStreams)).connect(endpoint); 446 447 } 448 449 @Override isConnectionPending()450 public boolean isConnectionPending() { 451 synchronized (stateLock) { 452 return (state == ChannelState.PENDING); 453 } 454 } 455 456 @Override finishConnect()457 public boolean finishConnect() throws IOException { 458 synchronized (receiveLock) { 459 synchronized (sendLock) { 460 synchronized (stateLock) { 461 if (!isOpen()) 462 throw new ClosedChannelException(); 463 if (isConnected()) 464 return true; 465 if (state != ChannelState.PENDING) 466 throw new NoConnectionPendingException(); 467 } 468 int n = 0; 469 try { 470 try { 471 begin(); 472 synchronized (blockingLock()) { 473 synchronized (stateLock) { 474 if (!isOpen()) { 475 return false; 476 } 477 receiverThread = NativeThread.current(); 478 } 479 if (!isBlocking()) { 480 for (;;) { 481 n = checkConnect(fd, false, readyToConnect); 482 if ( (n == IOStatus.INTERRUPTED) 483 && isOpen()) 484 continue; 485 break; 486 } 487 } else { 488 for (;;) { 489 n = checkConnect(fd, true, readyToConnect); 490 if (n == 0) { 491 // Loop in case of 492 // spurious notifications 493 continue; 494 } 495 if ( (n == IOStatus.INTERRUPTED) 496 && isOpen()) 497 continue; 498 break; 499 } 500 } 501 } 502 } finally { 503 synchronized (stateLock) { 504 receiverThread = 0; 505 if (state == ChannelState.KILLPENDING) { 506 kill(); 507 /* poll()/getsockopt() does not report 508 * error (throws exception, with n = 0) 509 * on Linux platform after dup2 and 510 * signal-wakeup. Force n to 0 so the 511 * end() can throw appropriate exception */ 512 n = 0; 513 } 514 } 515 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 516 assert IOStatus.check(n); 517 } 518 } catch (IOException x) { 519 /* If an exception was thrown, close the channel after 520 * invoking end() so as to avoid bogus 521 * AsynchronousCloseExceptions */ 522 close(); 523 throw x; 524 } 525 526 if (n > 0) { 527 synchronized (stateLock) { 528 state = ChannelState.CONNECTED; 529 if (!isBound()) { 530 InetSocketAddress boundIsa = 531 Net.localAddress(fd); 532 port = boundIsa.getPort(); 533 } 534 535 /* Receive COMM_UP */ 536 ByteBuffer buf = Util.getTemporaryDirectBuffer(50); 537 try { 538 receive(buf, null, null, true); 539 } finally { 540 Util.releaseTemporaryDirectBuffer(buf); 541 } 542 543 /* cache remote addresses */ 544 try { 545 remoteAddresses = getRemoteAddresses(); 546 } catch (IOException unused) { /* swallow exception */ } 547 548 return true; 549 } 550 } 551 } 552 } 553 return false; 554 } 555 556 @Override implConfigureBlocking(boolean block)557 protected void implConfigureBlocking(boolean block) throws IOException { 558 IOUtil.configureBlocking(fd, block); 559 } 560 561 @Override implCloseSelectableChannel()562 public void implCloseSelectableChannel() throws IOException { 563 synchronized (stateLock) { 564 SctpNet.preClose(fdVal); 565 566 if (receiverThread != 0) 567 NativeThread.signal(receiverThread); 568 569 if (senderThread != 0) 570 NativeThread.signal(senderThread); 571 572 if (!isRegistered()) 573 kill(); 574 } 575 } 576 577 @Override getFD()578 public FileDescriptor getFD() { 579 return fd; 580 } 581 582 @Override getFDVal()583 public int getFDVal() { 584 return fdVal; 585 } 586 587 /** 588 * Translates native poll revent ops into a ready operation ops 589 */ translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk)590 private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) { 591 int intOps = sk.nioInterestOps(); 592 int oldOps = sk.nioReadyOps(); 593 int newOps = initialOps; 594 595 if ((ops & Net.POLLNVAL) != 0) { 596 /* This should only happen if this channel is pre-closed while a 597 * selection operation is in progress 598 * ## Throw an error if this channel has not been pre-closed */ 599 return false; 600 } 601 602 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 603 newOps = intOps; 604 sk.nioReadyOps(newOps); 605 /* No need to poll again in checkConnect, 606 * the error will be detected there */ 607 readyToConnect = true; 608 return (newOps & ~oldOps) != 0; 609 } 610 611 if (((ops & Net.POLLIN) != 0) && 612 ((intOps & SelectionKey.OP_READ) != 0) && 613 isConnected()) 614 newOps |= SelectionKey.OP_READ; 615 616 if (((ops & Net.POLLCONN) != 0) && 617 ((intOps & SelectionKey.OP_CONNECT) != 0) && 618 ((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) { 619 newOps |= SelectionKey.OP_CONNECT; 620 readyToConnect = true; 621 } 622 623 if (((ops & Net.POLLOUT) != 0) && 624 ((intOps & SelectionKey.OP_WRITE) != 0) && 625 isConnected()) 626 newOps |= SelectionKey.OP_WRITE; 627 628 sk.nioReadyOps(newOps); 629 return (newOps & ~oldOps) != 0; 630 } 631 632 @Override translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk)633 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) { 634 return translateReadyOps(ops, sk.nioReadyOps(), sk); 635 } 636 637 @Override 638 @SuppressWarnings("all") translateAndSetReadyOps(int ops, SelectionKeyImpl sk)639 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) { 640 return translateReadyOps(ops, 0, sk); 641 } 642 643 @Override translateInterestOps(int ops)644 public int translateInterestOps(int ops) { 645 int newOps = 0; 646 if ((ops & SelectionKey.OP_READ) != 0) 647 newOps |= Net.POLLIN; 648 if ((ops & SelectionKey.OP_WRITE) != 0) 649 newOps |= Net.POLLOUT; 650 if ((ops & SelectionKey.OP_CONNECT) != 0) 651 newOps |= Net.POLLCONN; 652 return newOps; 653 } 654 655 @Override kill()656 public void kill() throws IOException { 657 synchronized (stateLock) { 658 if (state == ChannelState.KILLED) 659 return; 660 if (state == ChannelState.UNINITIALIZED) { 661 state = ChannelState.KILLED; 662 return; 663 } 664 assert !isOpen() && !isRegistered(); 665 666 /* Postpone the kill if there is a waiting reader 667 * or writer thread. */ 668 if (receiverThread == 0 && senderThread == 0) { 669 SctpNet.close(fdVal); 670 state = ChannelState.KILLED; 671 } else { 672 state = ChannelState.KILLPENDING; 673 } 674 } 675 } 676 677 @Override setOption(SctpSocketOption<T> name, T value)678 public <T> SctpChannel setOption(SctpSocketOption<T> name, T value) 679 throws IOException { 680 if (name == null) 681 throw new NullPointerException(); 682 if (!supportedOptions().contains(name)) 683 throw new UnsupportedOperationException("'" + name + "' not supported"); 684 685 synchronized (stateLock) { 686 if (!isOpen()) 687 throw new ClosedChannelException(); 688 689 SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/); 690 } 691 return this; 692 } 693 694 @Override 695 @SuppressWarnings("unchecked") getOption(SctpSocketOption<T> name)696 public <T> T getOption(SctpSocketOption<T> name) throws IOException { 697 if (name == null) 698 throw new NullPointerException(); 699 if (!supportedOptions().contains(name)) 700 throw new UnsupportedOperationException("'" + name + "' not supported"); 701 702 synchronized (stateLock) { 703 if (!isOpen()) 704 throw new ClosedChannelException(); 705 706 return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/); 707 } 708 } 709 710 private static class DefaultOptionsHolder { 711 static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions(); 712 defaultOptions()713 private static Set<SctpSocketOption<?>> defaultOptions() { 714 HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10); 715 set.add(SCTP_DISABLE_FRAGMENTS); 716 set.add(SCTP_EXPLICIT_COMPLETE); 717 set.add(SCTP_FRAGMENT_INTERLEAVE); 718 set.add(SCTP_INIT_MAXSTREAMS); 719 set.add(SCTP_NODELAY); 720 set.add(SCTP_PRIMARY_ADDR); 721 set.add(SCTP_SET_PEER_PRIMARY_ADDR); 722 set.add(SO_SNDBUF); 723 set.add(SO_RCVBUF); 724 set.add(SO_LINGER); 725 return Collections.unmodifiableSet(set); 726 } 727 } 728 729 @Override supportedOptions()730 public final Set<SctpSocketOption<?>> supportedOptions() { 731 return DefaultOptionsHolder.defaultOptions; 732 } 733 734 @Override receive(ByteBuffer buffer, T attachment, NotificationHandler<T> handler)735 public <T> MessageInfo receive(ByteBuffer buffer, 736 T attachment, 737 NotificationHandler<T> handler) 738 throws IOException { 739 return receive(buffer, attachment, handler, false); 740 } 741 receive(ByteBuffer buffer, T attachment, NotificationHandler<T> handler, boolean fromConnect)742 private <T> MessageInfo receive(ByteBuffer buffer, 743 T attachment, 744 NotificationHandler<T> handler, 745 boolean fromConnect) 746 throws IOException { 747 if (buffer == null) 748 throw new IllegalArgumentException("buffer cannot be null"); 749 750 if (buffer.isReadOnly()) 751 throw new IllegalArgumentException("Read-only buffer"); 752 753 if (receiveInvoked.get()) 754 throw new IllegalReceiveException( 755 "cannot invoke receive from handler"); 756 receiveInvoked.set(Boolean.TRUE); 757 758 try { 759 ResultContainer resultContainer = new ResultContainer(); 760 do { 761 resultContainer.clear(); 762 synchronized (receiveLock) { 763 if (!ensureReceiveOpen()) 764 return null; 765 766 int n = 0; 767 try { 768 begin(); 769 770 synchronized (stateLock) { 771 if(!isOpen()) 772 return null; 773 receiverThread = NativeThread.current(); 774 } 775 776 do { 777 n = receive(fdVal, buffer, resultContainer, fromConnect); 778 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 779 } finally { 780 receiverCleanup(); 781 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 782 assert IOStatus.check(n); 783 } 784 785 if (!resultContainer.isNotification()) { 786 /* message or nothing */ 787 if (resultContainer.hasSomething()) { 788 /* Set the association before returning */ 789 MessageInfoImpl info = 790 resultContainer.getMessageInfo(); 791 synchronized (stateLock) { 792 assert association != null; 793 info.setAssociation(association); 794 } 795 return info; 796 } else 797 /* Non-blocking may return null if nothing available*/ 798 return null; 799 } else { /* notification */ 800 synchronized (stateLock) { 801 handleNotificationInternal( 802 resultContainer); 803 } 804 } 805 806 if (fromConnect) { 807 /* If we reach here, then it was connect that invoked 808 * receive and received the COMM_UP. We have already 809 * handled the COMM_UP with the internal notification 810 * handler. Simply return. */ 811 return null; 812 } 813 } /* receiveLock */ 814 } while (handler == null ? true : 815 (invokeNotificationHandler(resultContainer, handler, attachment) 816 == HandlerResult.CONTINUE)); 817 818 return null; 819 } finally { 820 receiveInvoked.set(Boolean.FALSE); 821 } 822 } 823 receive(int fd, ByteBuffer dst, ResultContainer resultContainer, boolean peek)824 private int receive(int fd, 825 ByteBuffer dst, 826 ResultContainer resultContainer, 827 boolean peek) 828 throws IOException { 829 int pos = dst.position(); 830 int lim = dst.limit(); 831 assert (pos <= lim); 832 int rem = (pos <= lim ? lim - pos : 0); 833 if (dst instanceof DirectBuffer && rem > 0) 834 return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek); 835 836 /* Substitute a native buffer */ 837 int newSize = Math.max(rem, 1); 838 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); 839 try { 840 int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek); 841 bb.flip(); 842 if (n > 0 && rem > 0) 843 dst.put(bb); 844 return n; 845 } finally { 846 Util.releaseTemporaryDirectBuffer(bb); 847 } 848 } 849 receiveIntoNativeBuffer(int fd, ResultContainer resultContainer, ByteBuffer bb, int rem, int pos, boolean peek)850 private int receiveIntoNativeBuffer(int fd, 851 ResultContainer resultContainer, 852 ByteBuffer bb, 853 int rem, 854 int pos, 855 boolean peek) 856 throws IOException 857 { 858 int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek); 859 860 if (n > 0) 861 bb.position(pos + n); 862 return n; 863 } 864 865 private InternalNotificationHandler internalNotificationHandler = 866 new InternalNotificationHandler(); 867 handleNotificationInternal(ResultContainer resultContainer)868 private void handleNotificationInternal(ResultContainer resultContainer) 869 { 870 invokeNotificationHandler(resultContainer, 871 internalNotificationHandler, null); 872 } 873 874 private class InternalNotificationHandler 875 extends AbstractNotificationHandler<Object> 876 { 877 @Override handleNotification( AssociationChangeNotification not, Object unused)878 public HandlerResult handleNotification( 879 AssociationChangeNotification not, Object unused) { 880 if (not.event().equals( 881 AssociationChangeNotification.AssocChangeEvent.COMM_UP) && 882 association == null) { 883 AssociationChange sac = (AssociationChange) not; 884 association = new AssociationImpl 885 (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams()); 886 } 887 return HandlerResult.CONTINUE; 888 } 889 } 890 invokeNotificationHandler(ResultContainer resultContainer, NotificationHandler<T> handler, T attachment)891 private <T> HandlerResult invokeNotificationHandler 892 (ResultContainer resultContainer, 893 NotificationHandler<T> handler, 894 T attachment) { 895 SctpNotification notification = resultContainer.notification(); 896 synchronized (stateLock) { 897 notification.setAssociation(association); 898 } 899 900 if (!(handler instanceof AbstractNotificationHandler)) { 901 return handler.handleNotification(notification, attachment); 902 } 903 904 /* AbstractNotificationHandler */ 905 AbstractNotificationHandler<T> absHandler = 906 (AbstractNotificationHandler<T>)handler; 907 switch(resultContainer.type()) { 908 case ASSOCIATION_CHANGED : 909 return absHandler.handleNotification( 910 resultContainer.getAssociationChanged(), attachment); 911 case PEER_ADDRESS_CHANGED : 912 return absHandler.handleNotification( 913 resultContainer.getPeerAddressChanged(), attachment); 914 case SEND_FAILED : 915 return absHandler.handleNotification( 916 resultContainer.getSendFailed(), attachment); 917 case SHUTDOWN : 918 return absHandler.handleNotification( 919 resultContainer.getShutdown(), attachment); 920 default : 921 /* implementation specific handlers */ 922 return absHandler.handleNotification( 923 resultContainer.notification(), attachment); 924 } 925 } 926 checkAssociation(Association sendAssociation)927 private void checkAssociation(Association sendAssociation) { 928 synchronized (stateLock) { 929 if (sendAssociation != null && !sendAssociation.equals(association)) { 930 throw new IllegalArgumentException( 931 "Cannot send to another association"); 932 } 933 } 934 } 935 checkStreamNumber(int streamNumber)936 private void checkStreamNumber(int streamNumber) { 937 synchronized (stateLock) { 938 if (association != null) { 939 if (streamNumber < 0 || 940 streamNumber >= association.maxOutboundStreams()) 941 throw new InvalidStreamException(); 942 } 943 } 944 } 945 946 /* TODO: Add support for ttl and isComplete to both 121 12M 947 * SCTP_EOR not yet supported on reference platforms 948 * TTL support limited... 949 */ 950 @Override send(ByteBuffer buffer, MessageInfo messageInfo)951 public int send(ByteBuffer buffer, MessageInfo messageInfo) 952 throws IOException { 953 if (buffer == null) 954 throw new IllegalArgumentException("buffer cannot be null"); 955 956 if (messageInfo == null) 957 throw new IllegalArgumentException("messageInfo cannot be null"); 958 959 checkAssociation(messageInfo.association()); 960 checkStreamNumber(messageInfo.streamNumber()); 961 962 synchronized (sendLock) { 963 ensureSendOpen(); 964 965 int n = 0; 966 try { 967 begin(); 968 969 synchronized (stateLock) { 970 if(!isOpen()) 971 return 0; 972 senderThread = NativeThread.current(); 973 } 974 975 do { 976 n = send(fdVal, buffer, messageInfo); 977 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 978 979 return IOStatus.normalize(n); 980 } finally { 981 senderCleanup(); 982 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 983 assert IOStatus.check(n); 984 } 985 } 986 } 987 send(int fd, ByteBuffer src, MessageInfo messageInfo)988 private int send(int fd, ByteBuffer src, MessageInfo messageInfo) 989 throws IOException { 990 int streamNumber = messageInfo.streamNumber(); 991 SocketAddress target = messageInfo.address(); 992 boolean unordered = messageInfo.isUnordered(); 993 int ppid = messageInfo.payloadProtocolID(); 994 995 if (src instanceof DirectBuffer) 996 return sendFromNativeBuffer(fd, src, target, streamNumber, 997 unordered, ppid); 998 999 /* Substitute a native buffer */ 1000 int pos = src.position(); 1001 int lim = src.limit(); 1002 assert (pos <= lim && streamNumber >= 0); 1003 1004 int rem = (pos <= lim ? lim - pos : 0); 1005 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 1006 try { 1007 bb.put(src); 1008 bb.flip(); 1009 /* Do not update src until we see how many bytes were written */ 1010 src.position(pos); 1011 1012 int n = sendFromNativeBuffer(fd, bb, target, streamNumber, 1013 unordered, ppid); 1014 if (n > 0) { 1015 /* now update src */ 1016 src.position(pos + n); 1017 } 1018 return n; 1019 } finally { 1020 Util.releaseTemporaryDirectBuffer(bb); 1021 } 1022 } 1023 sendFromNativeBuffer(int fd, ByteBuffer bb, SocketAddress target, int streamNumber, boolean unordered, int ppid)1024 private int sendFromNativeBuffer(int fd, 1025 ByteBuffer bb, 1026 SocketAddress target, 1027 int streamNumber, 1028 boolean unordered, 1029 int ppid) 1030 throws IOException { 1031 InetAddress addr = null; // no preferred address 1032 int port = 0; 1033 if (target != null) { 1034 InetSocketAddress isa = Net.checkAddress(target); 1035 addr = isa.getAddress(); 1036 port = isa.getPort(); 1037 } 1038 1039 int pos = bb.position(); 1040 int lim = bb.limit(); 1041 assert (pos <= lim); 1042 int rem = (pos <= lim ? lim - pos : 0); 1043 1044 int written = send0(fd, ((DirectBuffer)bb).address() + pos, rem, addr, 1045 port, -1 /*121*/, streamNumber, unordered, ppid); 1046 if (written > 0) 1047 bb.position(pos + written); 1048 return written; 1049 } 1050 1051 @Override shutdown()1052 public SctpChannel shutdown() throws IOException { 1053 synchronized(stateLock) { 1054 if (isShutdown) 1055 return this; 1056 1057 ensureSendOpen(); 1058 SctpNet.shutdown(fdVal, -1); 1059 if (senderThread != 0) 1060 NativeThread.signal(senderThread); 1061 isShutdown = true; 1062 } 1063 return this; 1064 } 1065 1066 @Override getAllLocalAddresses()1067 public Set<SocketAddress> getAllLocalAddresses() 1068 throws IOException { 1069 synchronized (stateLock) { 1070 if (!isOpen()) 1071 throw new ClosedChannelException(); 1072 if (!isBound()) 1073 return Collections.emptySet(); 1074 1075 return SctpNet.getLocalAddresses(fdVal); 1076 } 1077 } 1078 1079 @Override getRemoteAddresses()1080 public Set<SocketAddress> getRemoteAddresses() 1081 throws IOException { 1082 synchronized (stateLock) { 1083 if (!isOpen()) 1084 throw new ClosedChannelException(); 1085 if (!isConnected() || isShutdown) 1086 return Collections.emptySet(); 1087 1088 try { 1089 return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/); 1090 } catch (SocketException unused) { 1091 /* an open connected channel should always have remote addresses */ 1092 return remoteAddresses; 1093 } 1094 } 1095 } 1096 1097 /* Native */ initIDs()1098 private static native void initIDs(); 1099 receive0(int fd, ResultContainer resultContainer, long address, int length, boolean peek)1100 static native int receive0(int fd, ResultContainer resultContainer, 1101 long address, int length, boolean peek) throws IOException; 1102 send0(int fd, long address, int length, InetAddress addr, int port, int assocId, int streamNumber, boolean unordered, int ppid)1103 static native int send0(int fd, long address, int length, 1104 InetAddress addr, int port, int assocId, int streamNumber, 1105 boolean unordered, int ppid) throws IOException; 1106 checkConnect(FileDescriptor fd, boolean block, boolean ready)1107 private static native int checkConnect(FileDescriptor fd, boolean block, 1108 boolean ready) throws IOException; 1109 1110 static { IOUtil.load()1111 IOUtil.load(); /* loads nio & net native libraries */ java.security.AccessController.doPrivileged( new java.security.PrivilegedAction<Void>() { public Void run() { System.loadLibrary(R); return null; } })1112 java.security.AccessController.doPrivileged( 1113 new java.security.PrivilegedAction<Void>() { 1114 public Void run() { 1115 System.loadLibrary("sctp"); 1116 return null; 1117 } 1118 }); initIDs()1119 initIDs(); 1120 } 1121 } 1122