1 /*
2  * Copyright (c) 2009, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /* @test
25  * @bug 6863110
26  * @summary Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector
27  * @author chegar
28  */
29 
30 import java.net.InetSocketAddress;
31 import java.net.SocketAddress;
32 import java.io.IOException;
33 import java.util.Iterator;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.nio.ByteBuffer;
37 import java.nio.channels.Selector;
38 import java.nio.channels.SelectionKey;
39 import com.sun.nio.sctp.AbstractNotificationHandler;
40 import com.sun.nio.sctp.AssociationChangeNotification;
41 import com.sun.nio.sctp.AssociationChangeNotification.AssocChangeEvent;
42 import com.sun.nio.sctp.HandlerResult;
43 import com.sun.nio.sctp.Notification;
44 import com.sun.nio.sctp.SctpChannel;
45 import com.sun.nio.sctp.SctpServerChannel;
46 import com.sun.nio.sctp.ShutdownNotification;
47 import static java.lang.System.out;
48 import static java.lang.System.err;
49 import static java.nio.channels.SelectionKey.OP_CONNECT;
50 import static java.nio.channels.SelectionKey.OP_READ;
51 
52 public class CommUp {
53     static CountDownLatch acceptLatch = new CountDownLatch(1);
54     static final int TIMEOUT = 10000;
55 
56     CommUpNotificationHandler clientHandler = new CommUpNotificationHandler();
57     CommUpNotificationHandler serverHandler = new CommUpNotificationHandler();
58     CommUpServer server;
59     Thread clientThread;
60 
test(String[] args)61     void test(String[] args) {
62         SocketAddress address = null;
63 
64         if (!Util.isSCTPSupported()) {
65             out.println("SCTP protocol is not supported");
66             out.println("Test cannot be run");
67             return;
68         }
69 
70         if (args.length == 2) {
71             /* requested to connecct to a specific address */
72             try {
73                 int port = Integer.valueOf(args[1]);
74                 address = new InetSocketAddress(args[0], port);
75             } catch (NumberFormatException nfe) {
76                 err.println(nfe);
77             }
78         } else {
79             /* start server on local machine, default */
80             try {
81                 server = new CommUpServer();
82                 server.start();
83                 address = server.address();
84                 debug("Server started and listening on " + address);
85             } catch (IOException ioe) {
86                 ioe.printStackTrace();
87                 return;
88             }
89         }
90 
91         /* store the main thread so that the server can interrupt it, if necessary */
92         clientThread = Thread.currentThread();
93 
94         doClient(address);
95     }
96 
doClient(SocketAddress peerAddress)97     void doClient(SocketAddress peerAddress) {
98         SctpChannel sc = null;
99         try {
100             debug("connecting to " + peerAddress);
101             sc = SctpChannel.open();
102             sc.configureBlocking(false);
103             check(sc.isBlocking() == false, "Should be in non-blocking mode");
104             sc.connect(peerAddress);
105 
106             Selector selector = Selector.open();
107             SelectionKey selectiontKey = sc.register(selector, OP_CONNECT);
108 
109             /* Expect two interest Ops */
110             boolean opConnectReceived = false;
111             boolean opReadReceived = false;
112             for (int z=0; z<2; z++) {
113                 debug("select " + z);
114                 int keysAdded = selector.select(TIMEOUT);
115                 debug("returned " + keysAdded + " keys");
116                 if (keysAdded > 0) {
117                     Set<SelectionKey> keys = selector.selectedKeys();
118                     Iterator<SelectionKey> i = keys.iterator();
119                     while(i.hasNext()) {
120                         SelectionKey sk = i.next();
121                         i.remove();
122                         SctpChannel readyChannel =
123                             (SctpChannel)sk.channel();
124 
125                         /* OP_CONNECT */
126                         if (sk.isConnectable()) {
127                             /* some trivial checks */
128                             check(opConnectReceived == false,
129                                   "should only received one OP_CONNECT");
130                             check(opReadReceived == false,
131                                   "should not receive OP_READ before OP_CONNECT");
132                             check(readyChannel.equals(sc),
133                                   "channels should be equal");
134                             check(!sk.isAcceptable(),
135                                   "key should not be acceptable");
136                             check(!sk.isReadable(),
137                                   "key should not be readable");
138                             check(!sk.isWritable(),
139                                   "key should not be writable");
140 
141                             /* now process the OP_CONNECT */
142                             opConnectReceived = true;
143                             check((sk.interestOps() & OP_CONNECT) == OP_CONNECT,
144                                   "selection key interest ops should contain OP_CONNECT");
145                             sk.interestOps(OP_READ);
146                             check((sk.interestOps() & OP_CONNECT) != OP_CONNECT,
147                                   "selection key interest ops should not contain OP_CONNECT");
148                             check(sc.finishConnect(),
149                                   "finishConnect should return true");
150                         } /* OP_READ */
151                           else if (sk.isReadable()) {
152                             /* some trivial checks */
153                             check(opConnectReceived == true,
154                                   "should receive one OP_CONNECT before OP_READ");
155                             check(opReadReceived == false,
156                                   "should not receive OP_READ before OP_CONNECT");
157                             check(readyChannel.equals(sc),
158                                   "channels should be equal");
159                             check(!sk.isAcceptable(),
160                                   "key should not be acceptable");
161                             check(sk.isReadable(),
162                                   "key should be readable");
163                             check(!sk.isWritable(),
164                                   "key should not be writable");
165                             check(!sk.isConnectable(),
166                                   "key should not be connectable");
167 
168                             /* now process the OP_READ */
169                             opReadReceived = true;
170                             selectiontKey.cancel();
171 
172                             /* try with small buffer to see if native
173                              * implementation can handle this */
174                             ByteBuffer buffer = ByteBuffer.allocateDirect(1);
175                             readyChannel.receive(buffer, null, clientHandler);
176                             check(clientHandler.receivedCommUp(),
177                                     "Client should have received COMM_UP");
178 
179                             /* dont close (or put anything on) the channel until
180                              * we check that the server's accepted channel also
181                              * received COMM_UP */
182                             serverHandler.waitForCommUp();
183                         } else {
184                             fail("Unexpected selection key");
185                         }
186                     }
187                 } else {
188                     fail("Client selector returned 0 ready keys");
189                     /* stop the server */
190                     server.thread().interrupt();
191                 }
192             } //for
193 
194         } catch (IOException ioe) {
195             unexpected(ioe);
196         } catch (InterruptedException ie) {
197             unexpected(ie);
198         }
199     }
200 
201     class CommUpServer implements Runnable
202     {
203         final InetSocketAddress serverAddr;
204         private SctpServerChannel ssc;
205         private Thread serverThread;
206 
CommUpServer()207         public CommUpServer() throws IOException {
208             ssc = SctpServerChannel.open().bind(null);
209             java.util.Set<SocketAddress> addrs = ssc.getAllLocalAddresses();
210             if (addrs.isEmpty())
211                 debug("addrs should not be empty");
212 
213             serverAddr = (InetSocketAddress) addrs.iterator().next();
214         }
215 
start()216         void start() {
217             serverThread = new Thread(this, "CommUpServer-"  +
218                                               serverAddr.getPort());
219             serverThread.start();
220         }
221 
address()222         InetSocketAddress address () {
223             return serverAddr;
224         }
225 
thread()226         Thread thread() {
227             return serverThread;
228         }
229 
230         @Override
run()231         public void run() {
232             Selector selector = null;
233             SctpChannel sc = null;
234             SelectionKey readKey = null;
235             try {
236                 sc = ssc.accept();
237                 debug("accepted " + sc);
238 
239                 selector = Selector.open();
240                 sc.configureBlocking(false);
241                 check(sc.isBlocking() == false, "Should be in non-blocking mode");
242                 readKey = sc.register(selector, SelectionKey.OP_READ);
243 
244                 debug("select");
245                 int keysAdded = selector.select(TIMEOUT);
246                 debug("returned " + keysAdded + " keys");
247                 if (keysAdded > 0) {
248                     Set<SelectionKey> keys = selector.selectedKeys();
249                     Iterator<SelectionKey> i = keys.iterator();
250                     while(i.hasNext()) {
251                         SelectionKey sk = i.next();
252                         i.remove();
253                         SctpChannel readyChannel =
254                             (SctpChannel)sk.channel();
255                         check(readyChannel.equals(sc),
256                                 "channels should be equal");
257                         check(!sk.isAcceptable(),
258                                 "key should not be acceptable");
259                         check(sk.isReadable(),
260                                 "key should be readable");
261                         check(!sk.isWritable(),
262                                 "key should not be writable");
263                         check(!sk.isConnectable(),
264                                 "key should not be connectable");
265 
266                         /* block until we check if the client has received its COMM_UP*/
267                         clientHandler.waitForCommUp();
268 
269                         ByteBuffer buffer = ByteBuffer.allocateDirect(1);
270                         sc.receive(buffer, null, serverHandler);
271                         check(serverHandler.receivedCommUp(),
272                                 "Accepted channel should have received COMM_UP");
273                     }
274                 } else {
275                    fail("Server selector returned 0 ready keys");
276                    /* stop the client */
277                    clientThread.interrupt();
278             }
279             } catch (IOException ioe) {
280                 ioe.printStackTrace();
281             } catch (InterruptedException unused) {
282             } finally {
283                 if (readKey != null) readKey.cancel();
284                 try { if (selector != null) selector.close(); }
285                 catch (IOException  ioe) { unexpected(ioe); }
286                 try { if (ssc != null) ssc.close(); }
287                 catch (IOException  ioe) { unexpected(ioe); }
288                 try { if (sc != null) sc.close(); }
289                 catch (IOException  ioe) { unexpected(ioe); }
290             }
291         }
292     }
293 
294     class CommUpNotificationHandler extends AbstractNotificationHandler<Object>
295     {
296         private boolean receivedCommUp;  // false
297 
receivedCommUp()298         public synchronized boolean receivedCommUp() {
299             return receivedCommUp;
300         }
301 
waitForCommUp()302         public synchronized boolean waitForCommUp() throws InterruptedException {
303             while (receivedCommUp == false) {
304                 wait();
305             }
306 
307             return false;
308         }
309 
310         @Override
handleNotification( Notification notification, Object attachment)311         public HandlerResult handleNotification(
312                 Notification notification, Object attachment) {
313             fail("Unknown notification type");
314             return HandlerResult.CONTINUE;
315         }
316 
317         @Override
handleNotification( AssociationChangeNotification notification, Object attachment)318         public synchronized HandlerResult handleNotification(
319                 AssociationChangeNotification notification, Object attachment) {
320             AssocChangeEvent event = notification.event();
321             debug("AssociationChangeNotification");
322             debug("  Association: " + notification.association());
323             debug("  Event: " + event);
324 
325             if (event.equals(AssocChangeEvent.COMM_UP)) {
326                 receivedCommUp = true;
327                 notifyAll();
328             }
329 
330             return HandlerResult.RETURN;
331         }
332 
333         @Override
handleNotification( ShutdownNotification notification, Object attachment)334         public HandlerResult handleNotification(
335                 ShutdownNotification notification, Object attachment) {
336             debug("ShutdownNotification");
337             debug("  Association: " + notification.association());
338             return HandlerResult.RETURN;
339         }
340     }
341 
342         //--------------------- Infrastructure ---------------------------
343     boolean debug = true;
344     volatile int passed = 0, failed = 0;
pass()345     void pass() {passed++;}
fail()346     void fail() {failed++; Thread.dumpStack();}
fail(String msg)347     void fail(String msg) {err.println(msg); fail();}
unexpected(Throwable t)348     void unexpected(Throwable t) {failed++; t.printStackTrace();}
check(boolean cond)349     void check(boolean cond) {if (cond) pass(); else fail();}
check(boolean cond, String failMessage)350     void check(boolean cond, String failMessage) {if (cond) pass(); else fail(failMessage);}
debug(String message)351     void debug(String message) {if(debug) { out.println(Thread.currentThread().getName() + ": " + message); }  }
sleep(long millis)352     void sleep(long millis) { try { Thread.currentThread().sleep(millis); }
353                           catch(InterruptedException ie) { unexpected(ie); }}
main(String[] args)354     public static void main(String[] args) throws Throwable {
355         Class<?> k = new Object(){}.getClass().getEnclosingClass();
356         try {k.getMethod("instanceMain",String[].class)
357                 .invoke( k.newInstance(), (Object) args);}
358         catch (Throwable e) {throw e.getCause();}}
instanceMain(String[] args)359     public void instanceMain(String[] args) throws Throwable {
360         try {test(args);} catch (Throwable t) {unexpected(t);}
361         out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
362         if (failed > 0) throw new AssertionError("Some tests failed");}
363 
364 }
365