1 /*
2  * Copyright (c) 2009, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package sun.nio.ch.sctp;
26 
27 import java.net.InetAddress;
28 import java.net.SocketAddress;
29 import java.net.SocketException;
30 import java.net.InetSocketAddress;
31 import java.io.FileDescriptor;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.Set;
35 import java.util.HashSet;
36 import java.nio.ByteBuffer;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.ClosedChannelException;
39 import java.nio.channels.ConnectionPendingException;
40 import java.nio.channels.NoConnectionPendingException;
41 import java.nio.channels.AlreadyConnectedException;
42 import java.nio.channels.NotYetBoundException;
43 import java.nio.channels.NotYetConnectedException;
44 import java.nio.channels.spi.SelectorProvider;
45 import com.sun.nio.sctp.AbstractNotificationHandler;
46 import com.sun.nio.sctp.Association;
47 import com.sun.nio.sctp.AssociationChangeNotification;
48 import com.sun.nio.sctp.HandlerResult;
49 import com.sun.nio.sctp.IllegalReceiveException;
50 import com.sun.nio.sctp.InvalidStreamException;
51 import com.sun.nio.sctp.IllegalUnbindException;
52 import com.sun.nio.sctp.MessageInfo;
53 import com.sun.nio.sctp.NotificationHandler;
54 import com.sun.nio.sctp.SctpChannel;
55 import com.sun.nio.sctp.SctpSocketOption;
56 import sun.nio.ch.DirectBuffer;
57 import sun.nio.ch.IOStatus;
58 import sun.nio.ch.IOUtil;
59 import sun.nio.ch.NativeThread;
60 import sun.nio.ch.Net;
61 import sun.nio.ch.SelChImpl;
62 import sun.nio.ch.SelectionKeyImpl;
63 import sun.nio.ch.Util;
64 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
65 import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED;
66 import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED;
67 import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED;
68 import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN;
69 
70 /**
71  * An implementation of an SctpChannel
72  */
73 public class SctpChannelImpl extends SctpChannel
74     implements SelChImpl
75 {
76     private final FileDescriptor fd;
77 
78     private final int fdVal;
79 
80     /* IDs of native threads doing send and receivess, for signalling */
81     private volatile long receiverThread = 0;
82     private volatile long senderThread = 0;
83 
84     /* Lock held by current receiving or connecting thread */
85     private final Object receiveLock = new Object();
86 
87     /* Lock held by current sending or connecting thread */
88     private final Object sendLock = new Object();
89 
90     private final ThreadLocal<Boolean> receiveInvoked =
91         new ThreadLocal<Boolean>() {
92              @Override protected Boolean initialValue() {
93                  return Boolean.FALSE;
94             }
95     };
96 
97     /* Lock held by any thread that modifies the state fields declared below
98        DO NOT invoke a blocking I/O operation while holding this lock! */
99     private final Object stateLock = new Object();
100 
101     private enum ChannelState {
102         UNINITIALIZED,
103         UNCONNECTED,
104         PENDING,
105         CONNECTED,
106         KILLPENDING,
107         KILLED,
108     }
109     /* -- The following fields are protected by stateLock -- */
110     private ChannelState state = ChannelState.UNINITIALIZED;
111 
112     /* Binding; Once bound the port will remain constant. */
113     int port = -1;
114     private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
115     /* Has the channel been bound to the wildcard address */
116     private boolean wildcard; /* false */
117     //private InetSocketAddress remoteAddress = null;
118 
119     /* Input/Output open */
120     private boolean readyToConnect;
121 
122     /* Shutdown */
123     private boolean isShutdown;
124 
125     private Association association;
126 
127     private Set<SocketAddress> remoteAddresses = Collections.emptySet();
128 
129     /* -- End of fields protected by stateLock -- */
130 
131     /**
132      * Constructor for normal connecting sockets
133      */
SctpChannelImpl(SelectorProvider provider)134     public SctpChannelImpl(SelectorProvider provider) throws IOException {
135         //TODO: update provider remove public modifier
136         super(provider);
137         this.fd = SctpNet.socket(true);
138         this.fdVal = IOUtil.fdVal(fd);
139         this.state = ChannelState.UNCONNECTED;
140     }
141 
142     /**
143      * Constructor for sockets obtained from server sockets
144      */
SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)145     public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)
146          throws IOException {
147         this(provider, fd, null);
148     }
149 
150     /**
151      * Constructor for sockets obtained from branching
152      */
SctpChannelImpl(SelectorProvider provider, FileDescriptor fd, Association association)153     public SctpChannelImpl(SelectorProvider provider,
154                            FileDescriptor fd,
155                            Association association)
156             throws IOException {
157         super(provider);
158         this.fd = fd;
159         this.fdVal = IOUtil.fdVal(fd);
160         this.state = ChannelState.CONNECTED;
161         port = (Net.localAddress(fd)).getPort();
162 
163         if (association != null) { /* branched */
164             this.association = association;
165         } else { /* obtained from server channel */
166             /* Receive COMM_UP */
167             ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
168             try {
169                 receive(buf, null, null, true);
170             } finally {
171                 Util.releaseTemporaryDirectBuffer(buf);
172             }
173         }
174     }
175 
176     /**
177      * Binds the channel's socket to a local address.
178      */
179     @Override
bind(SocketAddress local)180     public SctpChannel bind(SocketAddress local) throws IOException {
181         synchronized (receiveLock) {
182             synchronized (sendLock) {
183                 synchronized (stateLock) {
184                     ensureOpenAndUnconnected();
185                     if (isBound())
186                         SctpNet.throwAlreadyBoundException();
187                     InetSocketAddress isa = (local == null) ?
188                         new InetSocketAddress(0) : Net.checkAddress(local);
189                     SecurityManager sm = System.getSecurityManager();
190                     if (sm != null) {
191                         sm.checkListen(isa.getPort());
192                     }
193                     Net.bind(fd, isa.getAddress(), isa.getPort());
194                     InetSocketAddress boundIsa = Net.localAddress(fd);
195                     port = boundIsa.getPort();
196                     localAddresses.add(isa);
197                     if (isa.getAddress().isAnyLocalAddress())
198                         wildcard = true;
199                 }
200             }
201         }
202         return this;
203     }
204 
205     @Override
bindAddress(InetAddress address)206     public SctpChannel bindAddress(InetAddress address)
207             throws IOException {
208         bindUnbindAddress(address, true);
209         localAddresses.add(new InetSocketAddress(address, port));
210         return this;
211     }
212 
213     @Override
unbindAddress(InetAddress address)214     public SctpChannel unbindAddress(InetAddress address)
215             throws IOException {
216         bindUnbindAddress(address, false);
217         localAddresses.remove(new InetSocketAddress(address, port));
218         return this;
219     }
220 
bindUnbindAddress(InetAddress address, boolean add)221     private SctpChannel bindUnbindAddress(InetAddress address, boolean add)
222             throws IOException {
223         if (address == null)
224             throw new IllegalArgumentException();
225 
226         synchronized (receiveLock) {
227             synchronized (sendLock) {
228                 synchronized (stateLock) {
229                     if (!isOpen())
230                         throw new ClosedChannelException();
231                     if (!isBound())
232                         throw new NotYetBoundException();
233                     if (wildcard)
234                         throw new IllegalStateException(
235                                 "Cannot add or remove addresses from a channel that is bound to the wildcard address");
236                     if (address.isAnyLocalAddress())
237                         throw new IllegalArgumentException(
238                                 "Cannot add or remove the wildcard address");
239                     if (add) {
240                         for (InetSocketAddress addr : localAddresses) {
241                             if (addr.getAddress().equals(address)) {
242                                 SctpNet.throwAlreadyBoundException();
243                             }
244                         }
245                     } else { /*removing */
246                         /* Verify that there is more than one address
247                          * and that address is already bound */
248                         if (localAddresses.size() <= 1)
249                             throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
250                         boolean foundAddress = false;
251                         for (InetSocketAddress addr : localAddresses) {
252                             if (addr.getAddress().equals(address)) {
253                                 foundAddress = true;
254                                 break;
255                             }
256                         }
257                         if (!foundAddress )
258                             throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
259                     }
260 
261                     SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
262 
263                     /* Update our internal Set to reflect the addition/removal */
264                     if (add)
265                         localAddresses.add(new InetSocketAddress(address, port));
266                     else {
267                         for (InetSocketAddress addr : localAddresses) {
268                             if (addr.getAddress().equals(address)) {
269                                 localAddresses.remove(addr);
270                                 break;
271                             }
272                         }
273                     }
274                 }
275             }
276         }
277         return this;
278     }
279 
isBound()280     private boolean isBound() {
281         synchronized (stateLock) {
282             return port == -1 ? false : true;
283         }
284     }
285 
isConnected()286     private boolean isConnected() {
287         synchronized (stateLock) {
288             return (state == ChannelState.CONNECTED);
289         }
290     }
291 
ensureOpenAndUnconnected()292     private void ensureOpenAndUnconnected() throws IOException {
293         synchronized (stateLock) {
294             if (!isOpen())
295                 throw new ClosedChannelException();
296             if (isConnected())
297                 throw new AlreadyConnectedException();
298             if (state == ChannelState.PENDING)
299                 throw new ConnectionPendingException();
300         }
301     }
302 
ensureReceiveOpen()303     private boolean ensureReceiveOpen() throws ClosedChannelException {
304         synchronized (stateLock) {
305             if (!isOpen())
306                 throw new ClosedChannelException();
307             if (!isConnected())
308                 throw new NotYetConnectedException();
309             else
310                 return true;
311         }
312     }
313 
ensureSendOpen()314     private void ensureSendOpen() throws ClosedChannelException {
315         synchronized (stateLock) {
316             if (!isOpen())
317                 throw new ClosedChannelException();
318             if (isShutdown)
319                 throw new ClosedChannelException();
320             if (!isConnected())
321                 throw new NotYetConnectedException();
322         }
323     }
324 
receiverCleanup()325     private void receiverCleanup() throws IOException {
326         synchronized (stateLock) {
327             receiverThread = 0;
328             if (state == ChannelState.KILLPENDING)
329                 kill();
330         }
331     }
332 
senderCleanup()333     private void senderCleanup() throws IOException {
334         synchronized (stateLock) {
335             senderThread = 0;
336             if (state == ChannelState.KILLPENDING)
337                 kill();
338         }
339     }
340 
341     @Override
association()342     public Association association() throws ClosedChannelException {
343         synchronized (stateLock) {
344             if (!isOpen())
345                 throw new ClosedChannelException();
346             if (!isConnected())
347                 return null;
348 
349             return association;
350         }
351     }
352 
353     @Override
connect(SocketAddress endpoint)354     public boolean connect(SocketAddress endpoint) throws IOException {
355         synchronized (receiveLock) {
356             synchronized (sendLock) {
357                 ensureOpenAndUnconnected();
358                 InetSocketAddress isa = Net.checkAddress(endpoint);
359                 SecurityManager sm = System.getSecurityManager();
360                 if (sm != null)
361                     sm.checkConnect(isa.getAddress().getHostAddress(),
362                                     isa.getPort());
363                 synchronized (blockingLock()) {
364                     int n = 0;
365                     try {
366                         try {
367                             begin();
368                             synchronized (stateLock) {
369                                 if (!isOpen()) {
370                                     return false;
371                                 }
372                                 receiverThread = NativeThread.current();
373                             }
374                             for (;;) {
375                                 InetAddress ia = isa.getAddress();
376                                 if (ia.isAnyLocalAddress())
377                                     ia = InetAddress.getLocalHost();
378                                 n = SctpNet.connect(fdVal, ia, isa.getPort());
379                                 if (  (n == IOStatus.INTERRUPTED)
380                                       && isOpen())
381                                     continue;
382                                 break;
383                             }
384                         } finally {
385                             receiverCleanup();
386                             end((n > 0) || (n == IOStatus.UNAVAILABLE));
387                             assert IOStatus.check(n);
388                         }
389                     } catch (IOException x) {
390                         /* If an exception was thrown, close the channel after
391                          * invoking end() so as to avoid bogus
392                          * AsynchronousCloseExceptions */
393                         close();
394                         throw x;
395                     }
396 
397                     if (n > 0) {
398                         synchronized (stateLock) {
399                             /* Connection succeeded */
400                             state = ChannelState.CONNECTED;
401                             if (!isBound()) {
402                                 InetSocketAddress boundIsa =
403                                         Net.localAddress(fd);
404                                 port = boundIsa.getPort();
405                             }
406 
407                             /* Receive COMM_UP */
408                             ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
409                             try {
410                                 receive(buf, null, null, true);
411                             } finally {
412                                 Util.releaseTemporaryDirectBuffer(buf);
413                             }
414 
415                             /* cache remote addresses */
416                             try {
417                                 remoteAddresses = getRemoteAddresses();
418                             } catch (IOException unused) { /* swallow exception */ }
419 
420                             return true;
421                         }
422                     } else  {
423                         synchronized (stateLock) {
424                             /* If nonblocking and no exception then connection
425                              * pending; disallow another invocation */
426                             if (!isBlocking())
427                                 state = ChannelState.PENDING;
428                             else
429                                 assert false;
430                         }
431                     }
432                 }
433                 return false;
434             }
435         }
436     }
437 
438     @Override
connect(SocketAddress endpoint, int maxOutStreams, int maxInStreams)439     public boolean connect(SocketAddress endpoint,
440                            int maxOutStreams,
441                            int maxInStreams)
442             throws IOException {
443         ensureOpenAndUnconnected();
444         return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams.
445                 create(maxInStreams, maxOutStreams)).connect(endpoint);
446 
447     }
448 
449     @Override
isConnectionPending()450     public boolean isConnectionPending() {
451         synchronized (stateLock) {
452             return (state == ChannelState.PENDING);
453         }
454     }
455 
456     @Override
finishConnect()457     public boolean finishConnect() throws IOException {
458         synchronized (receiveLock) {
459             synchronized (sendLock) {
460                 synchronized (stateLock) {
461                     if (!isOpen())
462                         throw new ClosedChannelException();
463                     if (isConnected())
464                         return true;
465                     if (state != ChannelState.PENDING)
466                         throw new NoConnectionPendingException();
467                 }
468                 int n = 0;
469                 try {
470                     try {
471                         begin();
472                         synchronized (blockingLock()) {
473                             synchronized (stateLock) {
474                                 if (!isOpen()) {
475                                     return false;
476                                 }
477                                 receiverThread = NativeThread.current();
478                             }
479                             if (!isBlocking()) {
480                                 for (;;) {
481                                     n = checkConnect(fd, false, readyToConnect);
482                                     if (  (n == IOStatus.INTERRUPTED)
483                                           && isOpen())
484                                         continue;
485                                     break;
486                                 }
487                             } else {
488                                 for (;;) {
489                                     n = checkConnect(fd, true, readyToConnect);
490                                     if (n == 0) {
491                                         // Loop in case of
492                                         // spurious notifications
493                                         continue;
494                                     }
495                                     if (  (n == IOStatus.INTERRUPTED)
496                                           && isOpen())
497                                         continue;
498                                     break;
499                                 }
500                             }
501                         }
502                     } finally {
503                         synchronized (stateLock) {
504                             receiverThread = 0;
505                             if (state == ChannelState.KILLPENDING) {
506                                 kill();
507                                 /* poll()/getsockopt() does not report
508                                  * error (throws exception, with n = 0)
509                                  * on Linux platform after dup2 and
510                                  * signal-wakeup. Force n to 0 so the
511                                  * end() can throw appropriate exception */
512                                 n = 0;
513                             }
514                         }
515                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
516                         assert IOStatus.check(n);
517                     }
518                 } catch (IOException x) {
519                     /* If an exception was thrown, close the channel after
520                      * invoking end() so as to avoid bogus
521                      * AsynchronousCloseExceptions */
522                     close();
523                     throw x;
524                 }
525 
526                 if (n > 0) {
527                     synchronized (stateLock) {
528                         state = ChannelState.CONNECTED;
529                         if (!isBound()) {
530                             InetSocketAddress boundIsa =
531                                     Net.localAddress(fd);
532                             port = boundIsa.getPort();
533                         }
534 
535                         /* Receive COMM_UP */
536                         ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
537                         try {
538                             receive(buf, null, null, true);
539                         } finally {
540                             Util.releaseTemporaryDirectBuffer(buf);
541                         }
542 
543                         /* cache remote addresses */
544                         try {
545                             remoteAddresses = getRemoteAddresses();
546                         } catch (IOException unused) { /* swallow exception */ }
547 
548                         return true;
549                     }
550                 }
551             }
552         }
553         return false;
554     }
555 
556     @Override
implConfigureBlocking(boolean block)557     protected void implConfigureBlocking(boolean block) throws IOException {
558         IOUtil.configureBlocking(fd, block);
559     }
560 
561     @Override
implCloseSelectableChannel()562     public void implCloseSelectableChannel() throws IOException {
563         synchronized (stateLock) {
564             SctpNet.preClose(fdVal);
565 
566             if (receiverThread != 0)
567                 NativeThread.signal(receiverThread);
568 
569             if (senderThread != 0)
570                 NativeThread.signal(senderThread);
571 
572             if (!isRegistered())
573                 kill();
574         }
575     }
576 
577     @Override
getFD()578     public FileDescriptor getFD() {
579         return fd;
580     }
581 
582     @Override
getFDVal()583     public int getFDVal() {
584         return fdVal;
585     }
586 
587     /**
588      * Translates native poll revent ops into a ready operation ops
589      */
translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk)590     private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
591         int intOps = sk.nioInterestOps();
592         int oldOps = sk.nioReadyOps();
593         int newOps = initialOps;
594 
595         if ((ops & Net.POLLNVAL) != 0) {
596             /* This should only happen if this channel is pre-closed while a
597              * selection operation is in progress
598              * ## Throw an error if this channel has not been pre-closed */
599             return false;
600         }
601 
602         if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
603             newOps = intOps;
604             sk.nioReadyOps(newOps);
605             /* No need to poll again in checkConnect,
606              * the error will be detected there */
607             readyToConnect = true;
608             return (newOps & ~oldOps) != 0;
609         }
610 
611         if (((ops & Net.POLLIN) != 0) &&
612             ((intOps & SelectionKey.OP_READ) != 0) &&
613             isConnected())
614             newOps |= SelectionKey.OP_READ;
615 
616         if (((ops & Net.POLLCONN) != 0) &&
617             ((intOps & SelectionKey.OP_CONNECT) != 0) &&
618             ((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) {
619             newOps |= SelectionKey.OP_CONNECT;
620             readyToConnect = true;
621         }
622 
623         if (((ops & Net.POLLOUT) != 0) &&
624             ((intOps & SelectionKey.OP_WRITE) != 0) &&
625             isConnected())
626             newOps |= SelectionKey.OP_WRITE;
627 
628         sk.nioReadyOps(newOps);
629         return (newOps & ~oldOps) != 0;
630     }
631 
632     @Override
translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk)633     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
634         return translateReadyOps(ops, sk.nioReadyOps(), sk);
635     }
636 
637     @Override
638     @SuppressWarnings("all")
translateAndSetReadyOps(int ops, SelectionKeyImpl sk)639     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
640         return translateReadyOps(ops, 0, sk);
641     }
642 
643     @Override
translateInterestOps(int ops)644     public int translateInterestOps(int ops) {
645         int newOps = 0;
646         if ((ops & SelectionKey.OP_READ) != 0)
647             newOps |= Net.POLLIN;
648         if ((ops & SelectionKey.OP_WRITE) != 0)
649             newOps |= Net.POLLOUT;
650         if ((ops & SelectionKey.OP_CONNECT) != 0)
651             newOps |= Net.POLLCONN;
652         return newOps;
653     }
654 
655     @Override
kill()656     public void kill() throws IOException {
657         synchronized (stateLock) {
658             if (state == ChannelState.KILLED)
659                 return;
660             if (state == ChannelState.UNINITIALIZED) {
661                 state = ChannelState.KILLED;
662                 return;
663             }
664             assert !isOpen() && !isRegistered();
665 
666             /* Postpone the kill if there is a waiting reader
667              * or writer thread. */
668             if (receiverThread == 0 && senderThread == 0) {
669                 SctpNet.close(fdVal);
670                 state = ChannelState.KILLED;
671             } else {
672                 state = ChannelState.KILLPENDING;
673             }
674         }
675     }
676 
677     @Override
setOption(SctpSocketOption<T> name, T value)678     public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
679             throws IOException {
680         if (name == null)
681             throw new NullPointerException();
682         if (!supportedOptions().contains(name))
683             throw new UnsupportedOperationException("'" + name + "' not supported");
684 
685         synchronized (stateLock) {
686             if (!isOpen())
687                 throw new ClosedChannelException();
688 
689             SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/);
690         }
691         return this;
692     }
693 
694     @Override
695     @SuppressWarnings("unchecked")
getOption(SctpSocketOption<T> name)696     public <T> T getOption(SctpSocketOption<T> name) throws IOException {
697         if (name == null)
698             throw new NullPointerException();
699         if (!supportedOptions().contains(name))
700             throw new UnsupportedOperationException("'" + name + "' not supported");
701 
702         synchronized (stateLock) {
703             if (!isOpen())
704                 throw new ClosedChannelException();
705 
706             return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/);
707         }
708     }
709 
710     private static class DefaultOptionsHolder {
711         static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();
712 
defaultOptions()713         private static Set<SctpSocketOption<?>> defaultOptions() {
714             HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
715             set.add(SCTP_DISABLE_FRAGMENTS);
716             set.add(SCTP_EXPLICIT_COMPLETE);
717             set.add(SCTP_FRAGMENT_INTERLEAVE);
718             set.add(SCTP_INIT_MAXSTREAMS);
719             set.add(SCTP_NODELAY);
720             set.add(SCTP_PRIMARY_ADDR);
721             set.add(SCTP_SET_PEER_PRIMARY_ADDR);
722             set.add(SO_SNDBUF);
723             set.add(SO_RCVBUF);
724             set.add(SO_LINGER);
725             return Collections.unmodifiableSet(set);
726         }
727     }
728 
729     @Override
supportedOptions()730     public final Set<SctpSocketOption<?>> supportedOptions() {
731         return DefaultOptionsHolder.defaultOptions;
732     }
733 
734     @Override
receive(ByteBuffer buffer, T attachment, NotificationHandler<T> handler)735     public <T> MessageInfo receive(ByteBuffer buffer,
736                                    T attachment,
737                                    NotificationHandler<T> handler)
738             throws IOException {
739         return receive(buffer, attachment, handler, false);
740     }
741 
receive(ByteBuffer buffer, T attachment, NotificationHandler<T> handler, boolean fromConnect)742     private <T> MessageInfo receive(ByteBuffer buffer,
743                                     T attachment,
744                                     NotificationHandler<T> handler,
745                                     boolean fromConnect)
746             throws IOException {
747         if (buffer == null)
748             throw new IllegalArgumentException("buffer cannot be null");
749 
750         if (buffer.isReadOnly())
751             throw new IllegalArgumentException("Read-only buffer");
752 
753         if (receiveInvoked.get())
754             throw new IllegalReceiveException(
755                     "cannot invoke receive from handler");
756         receiveInvoked.set(Boolean.TRUE);
757 
758         try {
759             ResultContainer resultContainer = new ResultContainer();
760             do {
761                 resultContainer.clear();
762                 synchronized (receiveLock) {
763                     if (!ensureReceiveOpen())
764                         return null;
765 
766                     int n = 0;
767                     try {
768                         begin();
769 
770                         synchronized (stateLock) {
771                             if(!isOpen())
772                                 return null;
773                             receiverThread = NativeThread.current();
774                         }
775 
776                         do {
777                             n = receive(fdVal, buffer, resultContainer, fromConnect);
778                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
779                     } finally {
780                         receiverCleanup();
781                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
782                         assert IOStatus.check(n);
783                     }
784 
785                     if (!resultContainer.isNotification()) {
786                         /* message or nothing */
787                         if (resultContainer.hasSomething()) {
788                             /* Set the association before returning */
789                             MessageInfoImpl info =
790                                     resultContainer.getMessageInfo();
791                             synchronized (stateLock) {
792                                 assert association != null;
793                                 info.setAssociation(association);
794                             }
795                             return info;
796                         } else
797                             /* Non-blocking may return null if nothing available*/
798                             return null;
799                     } else { /* notification */
800                         synchronized (stateLock) {
801                             handleNotificationInternal(
802                                     resultContainer);
803                         }
804                     }
805 
806                     if (fromConnect)  {
807                         /* If we reach here, then it was connect that invoked
808                          * receive and received the COMM_UP. We have already
809                          * handled the COMM_UP with the internal notification
810                          * handler. Simply return. */
811                         return null;
812                     }
813                 }  /* receiveLock */
814             } while (handler == null ? true :
815                 (invokeNotificationHandler(resultContainer, handler, attachment)
816                  == HandlerResult.CONTINUE));
817 
818             return null;
819         } finally {
820             receiveInvoked.set(Boolean.FALSE);
821         }
822     }
823 
receive(int fd, ByteBuffer dst, ResultContainer resultContainer, boolean peek)824     private int receive(int fd,
825                         ByteBuffer dst,
826                         ResultContainer resultContainer,
827                         boolean peek)
828             throws IOException {
829         int pos = dst.position();
830         int lim = dst.limit();
831         assert (pos <= lim);
832         int rem = (pos <= lim ? lim - pos : 0);
833         if (dst instanceof DirectBuffer && rem > 0)
834             return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek);
835 
836         /* Substitute a native buffer */
837         int newSize = Math.max(rem, 1);
838         ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
839         try {
840             int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek);
841             bb.flip();
842             if (n > 0 && rem > 0)
843                 dst.put(bb);
844             return n;
845         } finally {
846             Util.releaseTemporaryDirectBuffer(bb);
847         }
848     }
849 
receiveIntoNativeBuffer(int fd, ResultContainer resultContainer, ByteBuffer bb, int rem, int pos, boolean peek)850     private int receiveIntoNativeBuffer(int fd,
851                                         ResultContainer resultContainer,
852                                         ByteBuffer bb,
853                                         int rem,
854                                         int pos,
855                                         boolean peek)
856         throws IOException
857     {
858         int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek);
859 
860         if (n > 0)
861             bb.position(pos + n);
862         return n;
863     }
864 
865     private InternalNotificationHandler internalNotificationHandler =
866             new InternalNotificationHandler();
867 
handleNotificationInternal(ResultContainer resultContainer)868     private void handleNotificationInternal(ResultContainer resultContainer)
869     {
870         invokeNotificationHandler(resultContainer,
871                 internalNotificationHandler, null);
872     }
873 
874     private class InternalNotificationHandler
875             extends AbstractNotificationHandler<Object>
876     {
877         @Override
handleNotification( AssociationChangeNotification not, Object unused)878         public HandlerResult handleNotification(
879                 AssociationChangeNotification not, Object unused) {
880             if (not.event().equals(
881                     AssociationChangeNotification.AssocChangeEvent.COMM_UP) &&
882                     association == null) {
883                 AssociationChange sac = (AssociationChange) not;
884                 association = new AssociationImpl
885                        (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
886             }
887             return HandlerResult.CONTINUE;
888         }
889     }
890 
invokeNotificationHandler(ResultContainer resultContainer, NotificationHandler<T> handler, T attachment)891     private <T> HandlerResult invokeNotificationHandler
892                                  (ResultContainer resultContainer,
893                                   NotificationHandler<T> handler,
894                                   T attachment) {
895         SctpNotification notification = resultContainer.notification();
896         synchronized (stateLock) {
897             notification.setAssociation(association);
898         }
899 
900         if (!(handler instanceof AbstractNotificationHandler)) {
901             return handler.handleNotification(notification, attachment);
902         }
903 
904         /* AbstractNotificationHandler */
905         AbstractNotificationHandler<T> absHandler =
906                 (AbstractNotificationHandler<T>)handler;
907         switch(resultContainer.type()) {
908             case ASSOCIATION_CHANGED :
909                 return absHandler.handleNotification(
910                         resultContainer.getAssociationChanged(), attachment);
911             case PEER_ADDRESS_CHANGED :
912                 return absHandler.handleNotification(
913                         resultContainer.getPeerAddressChanged(), attachment);
914             case SEND_FAILED :
915                 return absHandler.handleNotification(
916                         resultContainer.getSendFailed(), attachment);
917             case SHUTDOWN :
918                 return absHandler.handleNotification(
919                         resultContainer.getShutdown(), attachment);
920             default :
921                 /* implementation specific handlers */
922                 return absHandler.handleNotification(
923                         resultContainer.notification(), attachment);
924         }
925     }
926 
checkAssociation(Association sendAssociation)927     private void checkAssociation(Association sendAssociation) {
928         synchronized (stateLock) {
929             if (sendAssociation != null && !sendAssociation.equals(association)) {
930                 throw new IllegalArgumentException(
931                         "Cannot send to another association");
932             }
933         }
934     }
935 
checkStreamNumber(int streamNumber)936     private void checkStreamNumber(int streamNumber) {
937         synchronized (stateLock) {
938             if (association != null) {
939                 if (streamNumber < 0 ||
940                       streamNumber >= association.maxOutboundStreams())
941                     throw new InvalidStreamException();
942             }
943         }
944     }
945 
946     /* TODO: Add support for ttl and isComplete to both 121 12M
947      *       SCTP_EOR not yet supported on reference platforms
948      *       TTL support limited...
949      */
950     @Override
send(ByteBuffer buffer, MessageInfo messageInfo)951     public int send(ByteBuffer buffer, MessageInfo messageInfo)
952             throws IOException {
953         if (buffer == null)
954             throw new IllegalArgumentException("buffer cannot be null");
955 
956         if (messageInfo == null)
957             throw new IllegalArgumentException("messageInfo cannot be null");
958 
959         checkAssociation(messageInfo.association());
960         checkStreamNumber(messageInfo.streamNumber());
961 
962         synchronized (sendLock) {
963             ensureSendOpen();
964 
965             int n = 0;
966             try {
967                 begin();
968 
969                 synchronized (stateLock) {
970                     if(!isOpen())
971                         return 0;
972                     senderThread = NativeThread.current();
973                 }
974 
975                 do {
976                     n = send(fdVal, buffer, messageInfo);
977                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
978 
979                 return IOStatus.normalize(n);
980             } finally {
981                 senderCleanup();
982                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
983                 assert IOStatus.check(n);
984             }
985         }
986     }
987 
send(int fd, ByteBuffer src, MessageInfo messageInfo)988     private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
989             throws IOException {
990         int streamNumber = messageInfo.streamNumber();
991         SocketAddress target = messageInfo.address();
992         boolean unordered = messageInfo.isUnordered();
993         int ppid = messageInfo.payloadProtocolID();
994 
995         if (src instanceof DirectBuffer)
996             return sendFromNativeBuffer(fd, src, target, streamNumber,
997                     unordered, ppid);
998 
999         /* Substitute a native buffer */
1000         int pos = src.position();
1001         int lim = src.limit();
1002         assert (pos <= lim && streamNumber >= 0);
1003 
1004         int rem = (pos <= lim ? lim - pos : 0);
1005         ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
1006         try {
1007             bb.put(src);
1008             bb.flip();
1009             /* Do not update src until we see how many bytes were written */
1010             src.position(pos);
1011 
1012             int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
1013                     unordered, ppid);
1014             if (n > 0) {
1015                 /* now update src */
1016                 src.position(pos + n);
1017             }
1018             return n;
1019         } finally {
1020             Util.releaseTemporaryDirectBuffer(bb);
1021         }
1022     }
1023 
sendFromNativeBuffer(int fd, ByteBuffer bb, SocketAddress target, int streamNumber, boolean unordered, int ppid)1024     private int sendFromNativeBuffer(int fd,
1025                                      ByteBuffer bb,
1026                                      SocketAddress target,
1027                                      int streamNumber,
1028                                      boolean unordered,
1029                                      int ppid)
1030             throws IOException {
1031         InetAddress addr = null;     // no preferred address
1032         int port = 0;
1033         if (target != null) {
1034             InetSocketAddress isa = Net.checkAddress(target);
1035             addr = isa.getAddress();
1036             port = isa.getPort();
1037         }
1038 
1039         int pos = bb.position();
1040         int lim = bb.limit();
1041         assert (pos <= lim);
1042         int rem = (pos <= lim ? lim - pos : 0);
1043 
1044         int written = send0(fd, ((DirectBuffer)bb).address() + pos, rem, addr,
1045                             port, -1 /*121*/, streamNumber, unordered, ppid);
1046         if (written > 0)
1047             bb.position(pos + written);
1048         return written;
1049     }
1050 
1051     @Override
shutdown()1052     public SctpChannel shutdown() throws IOException {
1053         synchronized(stateLock) {
1054             if (isShutdown)
1055                 return this;
1056 
1057             ensureSendOpen();
1058             SctpNet.shutdown(fdVal, -1);
1059             if (senderThread != 0)
1060                 NativeThread.signal(senderThread);
1061             isShutdown = true;
1062         }
1063         return this;
1064     }
1065 
1066     @Override
getAllLocalAddresses()1067     public Set<SocketAddress> getAllLocalAddresses()
1068             throws IOException {
1069         synchronized (stateLock) {
1070             if (!isOpen())
1071                 throw new ClosedChannelException();
1072             if (!isBound())
1073                 return Collections.emptySet();
1074 
1075             return SctpNet.getLocalAddresses(fdVal);
1076         }
1077     }
1078 
1079     @Override
getRemoteAddresses()1080     public Set<SocketAddress> getRemoteAddresses()
1081             throws IOException {
1082         synchronized (stateLock) {
1083             if (!isOpen())
1084                 throw new ClosedChannelException();
1085             if (!isConnected() || isShutdown)
1086                 return Collections.emptySet();
1087 
1088             try {
1089                 return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/);
1090             } catch (SocketException unused) {
1091                 /* an open connected channel should always have remote addresses */
1092                 return remoteAddresses;
1093             }
1094         }
1095     }
1096 
1097     /* Native */
initIDs()1098     private static native void initIDs();
1099 
receive0(int fd, ResultContainer resultContainer, long address, int length, boolean peek)1100     static native int receive0(int fd, ResultContainer resultContainer,
1101             long address, int length, boolean peek) throws IOException;
1102 
send0(int fd, long address, int length, InetAddress addr, int port, int assocId, int streamNumber, boolean unordered, int ppid)1103     static native int send0(int fd, long address, int length,
1104             InetAddress addr, int port, int assocId, int streamNumber,
1105             boolean unordered, int ppid) throws IOException;
1106 
checkConnect(FileDescriptor fd, boolean block, boolean ready)1107     private static native int checkConnect(FileDescriptor fd, boolean block,
1108             boolean ready) throws IOException;
1109 
1110     static {
IOUtil.load()1111         IOUtil.load();   /* loads nio & net native libraries */
java.security.AccessController.doPrivileged( new java.security.PrivilegedAction<Void>() { public Void run() { System.loadLibrary(R); return null; } })1112         java.security.AccessController.doPrivileged(
1113             new java.security.PrivilegedAction<Void>() {
1114                 public Void run() {
1115                     System.loadLibrary("sctp");
1116                     return null;
1117                 }
1118             });
initIDs()1119         initIDs();
1120     }
1121 }
1122