1 /*
2  * Copyright (c) 2009, 2019, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /* @test
25  * @bug 6863110
26  * @summary Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector
27  * @author chegar
28  */
29 
30 import java.net.InetSocketAddress;
31 import java.net.SocketAddress;
32 import java.io.IOException;
33 import java.util.Iterator;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.nio.ByteBuffer;
37 import java.nio.channels.Selector;
38 import java.nio.channels.SelectionKey;
39 import com.sun.nio.sctp.AbstractNotificationHandler;
40 import com.sun.nio.sctp.AssociationChangeNotification;
41 import com.sun.nio.sctp.AssociationChangeNotification.AssocChangeEvent;
42 import com.sun.nio.sctp.HandlerResult;
43 import com.sun.nio.sctp.Notification;
44 import com.sun.nio.sctp.SctpChannel;
45 import com.sun.nio.sctp.SctpServerChannel;
46 import com.sun.nio.sctp.ShutdownNotification;
47 import static java.lang.System.out;
48 import static java.lang.System.err;
49 import static java.nio.channels.SelectionKey.OP_CONNECT;
50 import static java.nio.channels.SelectionKey.OP_READ;
51 
52 public class CommUp {
53     static CountDownLatch acceptLatch = new CountDownLatch(1);
54     static final int TIMEOUT = 10000;
55 
56     CommUpNotificationHandler clientHandler = new CommUpNotificationHandler();
57     CommUpNotificationHandler serverHandler = new CommUpNotificationHandler();
58     CommUpServer server;
59     Thread clientThread;
60 
test(String[] args)61     void test(String[] args) {
62         SocketAddress address = null;
63 
64         if (!Util.isSCTPSupported()) {
65             out.println("SCTP protocol is not supported");
66             out.println("Test cannot be run");
67             return;
68         }
69 
70         if (args.length == 2) {
71             /* requested to connecct to a specific address */
72             try {
73                 int port = Integer.valueOf(args[1]);
74                 address = new InetSocketAddress(args[0], port);
75             } catch (NumberFormatException nfe) {
76                 err.println(nfe);
77             }
78         } else {
79             /* start server on local machine, default */
80             try {
81                 server = new CommUpServer();
82                 server.start();
83                 address = server.address();
84                 debug("Server started and listening on " + address);
85             } catch (IOException ioe) {
86                 ioe.printStackTrace();
87                 return;
88             }
89         }
90 
91         /* store the main thread so that the server can interrupt it, if necessary */
92         clientThread = Thread.currentThread();
93 
94         doClient(address);
95     }
96 
doClient(SocketAddress peerAddress)97     void doClient(SocketAddress peerAddress) {
98         SctpChannel sc = null;
99         try {
100             debug("connecting to " + peerAddress);
101             sc = SctpChannel.open();
102             sc.configureBlocking(false);
103             check(sc.isBlocking() == false, "Should be in non-blocking mode");
104             sc.connect(peerAddress);
105 
106             Selector selector = Selector.open();
107             SelectionKey selectiontKey = sc.register(selector, OP_CONNECT);
108 
109             /* Expect two interest Ops */
110             boolean opConnectReceived = false;
111             boolean opReadReceived = false;
112             for (int z=0; z<2; z++) {
113                 debug("select " + z);
114                 int keysAdded = selector.select(TIMEOUT);
115                 debug("returned " + keysAdded + " keys");
116                 if (keysAdded > 0) {
117                     Set<SelectionKey> keys = selector.selectedKeys();
118                     Iterator<SelectionKey> i = keys.iterator();
119                     while(i.hasNext()) {
120                         SelectionKey sk = i.next();
121                         i.remove();
122                         SctpChannel readyChannel =
123                             (SctpChannel)sk.channel();
124 
125                         /* OP_CONNECT */
126                         if (sk.isConnectable()) {
127                             /* some trivial checks */
128                             check(opConnectReceived == false,
129                                   "should only received one OP_CONNECT");
130                             check(opReadReceived == false,
131                                   "should not receive OP_READ before OP_CONNECT");
132                             check(readyChannel.equals(sc),
133                                   "channels should be equal");
134                             check(!sk.isAcceptable(),
135                                   "key should not be acceptable");
136                             check(!sk.isReadable(),
137                                   "key should not be readable");
138                             check(!sk.isWritable(),
139                                   "key should not be writable");
140 
141                             /* now process the OP_CONNECT */
142                             opConnectReceived = true;
143                             check((sk.interestOps() & OP_CONNECT) == OP_CONNECT,
144                                   "selection key interest ops should contain OP_CONNECT");
145                             sk.interestOps(OP_READ);
146                             check((sk.interestOps() & OP_CONNECT) != OP_CONNECT,
147                                   "selection key interest ops should not contain OP_CONNECT");
148                             check(sc.finishConnect(),
149                                   "finishConnect should return true");
150                         } /* OP_READ */
151                           else if (sk.isReadable()) {
152                             /* some trivial checks */
153                             check(opConnectReceived == true,
154                                   "should receive one OP_CONNECT before OP_READ");
155                             check(opReadReceived == false,
156                                   "should not receive OP_READ before OP_CONNECT");
157                             check(readyChannel.equals(sc),
158                                   "channels should be equal");
159                             check(!sk.isAcceptable(),
160                                   "key should not be acceptable");
161                             check(sk.isReadable(),
162                                   "key should be readable");
163                             check(!sk.isWritable(),
164                                   "key should not be writable");
165                             check(!sk.isConnectable(),
166                                   "key should not be connectable");
167 
168                             /* now process the OP_READ */
169                             opReadReceived = true;
170                             selectiontKey.cancel();
171 
172                             /* try with small buffer to see if native
173                              * implementation can handle this */
174                             ByteBuffer buffer = ByteBuffer.allocateDirect(1);
175                             readyChannel.receive(buffer, null, clientHandler);
176                             check(clientHandler.receivedCommUp(),
177                                     "Client should have received COMM_UP");
178 
179                             /* dont close (or put anything on) the channel until
180                              * we check that the server's accepted channel also
181                              * received COMM_UP */
182                             serverHandler.waitForCommUp();
183                         } else {
184                             fail("Unexpected selection key");
185                         }
186                     }
187                 } else {
188                     fail("Client selector returned 0 ready keys");
189                     /* stop the server */
190                     server.thread().interrupt();
191                 }
192             } //for
193 
194             try { sc.close(); }
195             catch (IOException ioe) { unexpected(ioe); }
196 
197         } catch (IOException ioe) {
198             unexpected(ioe);
199         } catch (InterruptedException ie) {
200             unexpected(ie);
201         }
202     }
203 
204     class CommUpServer implements Runnable
205     {
206         final InetSocketAddress serverAddr;
207         private SctpServerChannel ssc;
208         private Thread serverThread;
209 
CommUpServer()210         public CommUpServer() throws IOException {
211             ssc = SctpServerChannel.open().bind(null);
212             java.util.Set<SocketAddress> addrs = ssc.getAllLocalAddresses();
213             if (addrs.isEmpty())
214                 debug("addrs should not be empty");
215 
216             serverAddr = (InetSocketAddress) addrs.iterator().next();
217         }
218 
start()219         void start() {
220             serverThread = new Thread(this, "CommUpServer-"  +
221                                               serverAddr.getPort());
222             serverThread.start();
223         }
224 
address()225         InetSocketAddress address () {
226             return serverAddr;
227         }
228 
thread()229         Thread thread() {
230             return serverThread;
231         }
232 
233         @Override
run()234         public void run() {
235             Selector selector = null;
236             SctpChannel sc = null;
237             SelectionKey readKey = null;
238             try {
239                 sc = ssc.accept();
240                 debug("accepted " + sc);
241 
242                 selector = Selector.open();
243                 sc.configureBlocking(false);
244                 check(sc.isBlocking() == false, "Should be in non-blocking mode");
245                 readKey = sc.register(selector, SelectionKey.OP_READ);
246 
247                 debug("select");
248                 int keysAdded = selector.select(TIMEOUT);
249                 debug("returned " + keysAdded + " keys");
250                 if (keysAdded > 0) {
251                     Set<SelectionKey> keys = selector.selectedKeys();
252                     Iterator<SelectionKey> i = keys.iterator();
253                     while(i.hasNext()) {
254                         SelectionKey sk = i.next();
255                         i.remove();
256                         SctpChannel readyChannel =
257                             (SctpChannel)sk.channel();
258                         check(readyChannel.equals(sc),
259                                 "channels should be equal");
260                         check(!sk.isAcceptable(),
261                                 "key should not be acceptable");
262                         check(sk.isReadable(),
263                                 "key should be readable");
264                         check(!sk.isWritable(),
265                                 "key should not be writable");
266                         check(!sk.isConnectable(),
267                                 "key should not be connectable");
268 
269                         /* block until we check if the client has received its COMM_UP*/
270                         clientHandler.waitForCommUp();
271 
272                         ByteBuffer buffer = ByteBuffer.allocateDirect(1);
273                         sc.receive(buffer, null, serverHandler);
274                         check(serverHandler.receivedCommUp(),
275                                 "Accepted channel should have received COMM_UP");
276                     }
277                 } else {
278                    fail("Server selector returned 0 ready keys");
279                    /* stop the client */
280                    clientThread.interrupt();
281             }
282             } catch (IOException ioe) {
283                 ioe.printStackTrace();
284             } catch (InterruptedException unused) {
285             } finally {
286                 if (readKey != null) readKey.cancel();
287                 try { if (selector != null) selector.close(); }
288                 catch (IOException  ioe) { unexpected(ioe); }
289                 try { if (ssc != null) ssc.close(); }
290                 catch (IOException  ioe) { unexpected(ioe); }
291                 try { if (sc != null) sc.close(); }
292                 catch (IOException  ioe) { unexpected(ioe); }
293             }
294         }
295     }
296 
297     class CommUpNotificationHandler extends AbstractNotificationHandler<Object>
298     {
299         private boolean receivedCommUp;  // false
300 
receivedCommUp()301         public synchronized boolean receivedCommUp() {
302             return receivedCommUp;
303         }
304 
waitForCommUp()305         public synchronized boolean waitForCommUp() throws InterruptedException {
306             while (receivedCommUp == false) {
307                 wait();
308             }
309 
310             return false;
311         }
312 
313         @Override
handleNotification( Notification notification, Object attachment)314         public HandlerResult handleNotification(
315                 Notification notification, Object attachment) {
316             fail("Unknown notification type");
317             return HandlerResult.CONTINUE;
318         }
319 
320         @Override
handleNotification( AssociationChangeNotification notification, Object attachment)321         public synchronized HandlerResult handleNotification(
322                 AssociationChangeNotification notification, Object attachment) {
323             AssocChangeEvent event = notification.event();
324             debug("AssociationChangeNotification");
325             debug("  Association: " + notification.association());
326             debug("  Event: " + event);
327 
328             if (event.equals(AssocChangeEvent.COMM_UP)) {
329                 receivedCommUp = true;
330                 notifyAll();
331             }
332 
333             return HandlerResult.RETURN;
334         }
335 
336         @Override
handleNotification( ShutdownNotification notification, Object attachment)337         public HandlerResult handleNotification(
338                 ShutdownNotification notification, Object attachment) {
339             debug("ShutdownNotification");
340             debug("  Association: " + notification.association());
341             return HandlerResult.RETURN;
342         }
343     }
344 
345         //--------------------- Infrastructure ---------------------------
346     boolean debug = true;
347     volatile int passed = 0, failed = 0;
pass()348     void pass() {passed++;}
fail()349     void fail() {failed++; Thread.dumpStack();}
fail(String msg)350     void fail(String msg) {err.println(msg); fail();}
unexpected(Throwable t)351     void unexpected(Throwable t) {failed++; t.printStackTrace();}
check(boolean cond)352     void check(boolean cond) {if (cond) pass(); else fail();}
check(boolean cond, String failMessage)353     void check(boolean cond, String failMessage) {if (cond) pass(); else fail(failMessage);}
debug(String message)354     void debug(String message) {if(debug) { out.println(Thread.currentThread().getName() + ": " + message); }  }
sleep(long millis)355     void sleep(long millis) { try { Thread.currentThread().sleep(millis); }
356                           catch(InterruptedException ie) { unexpected(ie); }}
main(String[] args)357     public static void main(String[] args) throws Throwable {
358         Class<?> k = new Object(){}.getClass().getEnclosingClass();
359         try {k.getMethod("instanceMain",String[].class)
360                 .invoke( k.newInstance(), (Object) args);}
361         catch (Throwable e) {throw e.getCause();}}
instanceMain(String[] args)362     public void instanceMain(String[] args) throws Throwable {
363         try {test(args);} catch (Throwable t) {unexpected(t);}
364         out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
365         if (failed > 0) throw new AssertionError("Some tests failed");}
366 
367 }
368