1 /* 2 * Copyright (c) 2009, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 6863110 26 * @summary Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector 27 * @author chegar 28 */ 29 30 import java.net.InetSocketAddress; 31 import java.net.SocketAddress; 32 import java.io.IOException; 33 import java.util.Iterator; 34 import java.util.Set; 35 import java.util.concurrent.CountDownLatch; 36 import java.nio.ByteBuffer; 37 import java.nio.channels.Selector; 38 import java.nio.channels.SelectionKey; 39 import com.sun.nio.sctp.AbstractNotificationHandler; 40 import com.sun.nio.sctp.AssociationChangeNotification; 41 import com.sun.nio.sctp.AssociationChangeNotification.AssocChangeEvent; 42 import com.sun.nio.sctp.HandlerResult; 43 import com.sun.nio.sctp.Notification; 44 import com.sun.nio.sctp.SctpChannel; 45 import com.sun.nio.sctp.SctpServerChannel; 46 import com.sun.nio.sctp.ShutdownNotification; 47 import static java.lang.System.out; 48 import static java.lang.System.err; 49 import static java.nio.channels.SelectionKey.OP_CONNECT; 50 import static java.nio.channels.SelectionKey.OP_READ; 51 52 public class CommUp { 53 static CountDownLatch acceptLatch = new CountDownLatch(1); 54 static final int TIMEOUT = 10000; 55 56 CommUpNotificationHandler clientHandler = new CommUpNotificationHandler(); 57 CommUpNotificationHandler serverHandler = new CommUpNotificationHandler(); 58 CommUpServer server; 59 Thread clientThread; 60 test(String[] args)61 void test(String[] args) { 62 SocketAddress address = null; 63 64 if (!Util.isSCTPSupported()) { 65 out.println("SCTP protocol is not supported"); 66 out.println("Test cannot be run"); 67 return; 68 } 69 70 if (args.length == 2) { 71 /* requested to connecct to a specific address */ 72 try { 73 int port = Integer.valueOf(args[1]); 74 address = new InetSocketAddress(args[0], port); 75 } catch (NumberFormatException nfe) { 76 err.println(nfe); 77 } 78 } else { 79 /* start server on local machine, default */ 80 try { 81 server = new CommUpServer(); 82 server.start(); 83 address = server.address(); 84 debug("Server started and listening on " + address); 85 } catch (IOException ioe) { 86 ioe.printStackTrace(); 87 return; 88 } 89 } 90 91 /* store the main thread so that the server can interrupt it, if necessary */ 92 clientThread = Thread.currentThread(); 93 94 doClient(address); 95 } 96 doClient(SocketAddress peerAddress)97 void doClient(SocketAddress peerAddress) { 98 SctpChannel sc = null; 99 try { 100 debug("connecting to " + peerAddress); 101 sc = SctpChannel.open(); 102 sc.configureBlocking(false); 103 check(sc.isBlocking() == false, "Should be in non-blocking mode"); 104 sc.connect(peerAddress); 105 106 Selector selector = Selector.open(); 107 SelectionKey selectiontKey = sc.register(selector, OP_CONNECT); 108 109 /* Expect two interest Ops */ 110 boolean opConnectReceived = false; 111 boolean opReadReceived = false; 112 for (int z=0; z<2; z++) { 113 debug("select " + z); 114 int keysAdded = selector.select(TIMEOUT); 115 debug("returned " + keysAdded + " keys"); 116 if (keysAdded > 0) { 117 Set<SelectionKey> keys = selector.selectedKeys(); 118 Iterator<SelectionKey> i = keys.iterator(); 119 while(i.hasNext()) { 120 SelectionKey sk = i.next(); 121 i.remove(); 122 SctpChannel readyChannel = 123 (SctpChannel)sk.channel(); 124 125 /* OP_CONNECT */ 126 if (sk.isConnectable()) { 127 /* some trivial checks */ 128 check(opConnectReceived == false, 129 "should only received one OP_CONNECT"); 130 check(opReadReceived == false, 131 "should not receive OP_READ before OP_CONNECT"); 132 check(readyChannel.equals(sc), 133 "channels should be equal"); 134 check(!sk.isAcceptable(), 135 "key should not be acceptable"); 136 check(!sk.isReadable(), 137 "key should not be readable"); 138 check(!sk.isWritable(), 139 "key should not be writable"); 140 141 /* now process the OP_CONNECT */ 142 opConnectReceived = true; 143 check((sk.interestOps() & OP_CONNECT) == OP_CONNECT, 144 "selection key interest ops should contain OP_CONNECT"); 145 sk.interestOps(OP_READ); 146 check((sk.interestOps() & OP_CONNECT) != OP_CONNECT, 147 "selection key interest ops should not contain OP_CONNECT"); 148 check(sc.finishConnect(), 149 "finishConnect should return true"); 150 } /* OP_READ */ 151 else if (sk.isReadable()) { 152 /* some trivial checks */ 153 check(opConnectReceived == true, 154 "should receive one OP_CONNECT before OP_READ"); 155 check(opReadReceived == false, 156 "should not receive OP_READ before OP_CONNECT"); 157 check(readyChannel.equals(sc), 158 "channels should be equal"); 159 check(!sk.isAcceptable(), 160 "key should not be acceptable"); 161 check(sk.isReadable(), 162 "key should be readable"); 163 check(!sk.isWritable(), 164 "key should not be writable"); 165 check(!sk.isConnectable(), 166 "key should not be connectable"); 167 168 /* now process the OP_READ */ 169 opReadReceived = true; 170 selectiontKey.cancel(); 171 172 /* try with small buffer to see if native 173 * implementation can handle this */ 174 ByteBuffer buffer = ByteBuffer.allocateDirect(1); 175 readyChannel.receive(buffer, null, clientHandler); 176 check(clientHandler.receivedCommUp(), 177 "Client should have received COMM_UP"); 178 179 /* dont close (or put anything on) the channel until 180 * we check that the server's accepted channel also 181 * received COMM_UP */ 182 serverHandler.waitForCommUp(); 183 } else { 184 fail("Unexpected selection key"); 185 } 186 } 187 } else { 188 fail("Client selector returned 0 ready keys"); 189 /* stop the server */ 190 server.thread().interrupt(); 191 } 192 } //for 193 194 try { sc.close(); } 195 catch (IOException ioe) { unexpected(ioe); } 196 197 } catch (IOException ioe) { 198 unexpected(ioe); 199 } catch (InterruptedException ie) { 200 unexpected(ie); 201 } 202 } 203 204 class CommUpServer implements Runnable 205 { 206 final InetSocketAddress serverAddr; 207 private SctpServerChannel ssc; 208 private Thread serverThread; 209 CommUpServer()210 public CommUpServer() throws IOException { 211 ssc = SctpServerChannel.open().bind(null); 212 java.util.Set<SocketAddress> addrs = ssc.getAllLocalAddresses(); 213 if (addrs.isEmpty()) 214 debug("addrs should not be empty"); 215 216 serverAddr = (InetSocketAddress) addrs.iterator().next(); 217 } 218 start()219 void start() { 220 serverThread = new Thread(this, "CommUpServer-" + 221 serverAddr.getPort()); 222 serverThread.start(); 223 } 224 address()225 InetSocketAddress address () { 226 return serverAddr; 227 } 228 thread()229 Thread thread() { 230 return serverThread; 231 } 232 233 @Override run()234 public void run() { 235 Selector selector = null; 236 SctpChannel sc = null; 237 SelectionKey readKey = null; 238 try { 239 sc = ssc.accept(); 240 debug("accepted " + sc); 241 242 selector = Selector.open(); 243 sc.configureBlocking(false); 244 check(sc.isBlocking() == false, "Should be in non-blocking mode"); 245 readKey = sc.register(selector, SelectionKey.OP_READ); 246 247 debug("select"); 248 int keysAdded = selector.select(TIMEOUT); 249 debug("returned " + keysAdded + " keys"); 250 if (keysAdded > 0) { 251 Set<SelectionKey> keys = selector.selectedKeys(); 252 Iterator<SelectionKey> i = keys.iterator(); 253 while(i.hasNext()) { 254 SelectionKey sk = i.next(); 255 i.remove(); 256 SctpChannel readyChannel = 257 (SctpChannel)sk.channel(); 258 check(readyChannel.equals(sc), 259 "channels should be equal"); 260 check(!sk.isAcceptable(), 261 "key should not be acceptable"); 262 check(sk.isReadable(), 263 "key should be readable"); 264 check(!sk.isWritable(), 265 "key should not be writable"); 266 check(!sk.isConnectable(), 267 "key should not be connectable"); 268 269 /* block until we check if the client has received its COMM_UP*/ 270 clientHandler.waitForCommUp(); 271 272 ByteBuffer buffer = ByteBuffer.allocateDirect(1); 273 sc.receive(buffer, null, serverHandler); 274 check(serverHandler.receivedCommUp(), 275 "Accepted channel should have received COMM_UP"); 276 } 277 } else { 278 fail("Server selector returned 0 ready keys"); 279 /* stop the client */ 280 clientThread.interrupt(); 281 } 282 } catch (IOException ioe) { 283 ioe.printStackTrace(); 284 } catch (InterruptedException unused) { 285 } finally { 286 if (readKey != null) readKey.cancel(); 287 try { if (selector != null) selector.close(); } 288 catch (IOException ioe) { unexpected(ioe); } 289 try { if (ssc != null) ssc.close(); } 290 catch (IOException ioe) { unexpected(ioe); } 291 try { if (sc != null) sc.close(); } 292 catch (IOException ioe) { unexpected(ioe); } 293 } 294 } 295 } 296 297 class CommUpNotificationHandler extends AbstractNotificationHandler<Object> 298 { 299 private boolean receivedCommUp; // false 300 receivedCommUp()301 public synchronized boolean receivedCommUp() { 302 return receivedCommUp; 303 } 304 waitForCommUp()305 public synchronized boolean waitForCommUp() throws InterruptedException { 306 while (receivedCommUp == false) { 307 wait(); 308 } 309 310 return false; 311 } 312 313 @Override handleNotification( Notification notification, Object attachment)314 public HandlerResult handleNotification( 315 Notification notification, Object attachment) { 316 fail("Unknown notification type"); 317 return HandlerResult.CONTINUE; 318 } 319 320 @Override handleNotification( AssociationChangeNotification notification, Object attachment)321 public synchronized HandlerResult handleNotification( 322 AssociationChangeNotification notification, Object attachment) { 323 AssocChangeEvent event = notification.event(); 324 debug("AssociationChangeNotification"); 325 debug(" Association: " + notification.association()); 326 debug(" Event: " + event); 327 328 if (event.equals(AssocChangeEvent.COMM_UP)) { 329 receivedCommUp = true; 330 notifyAll(); 331 } 332 333 return HandlerResult.RETURN; 334 } 335 336 @Override handleNotification( ShutdownNotification notification, Object attachment)337 public HandlerResult handleNotification( 338 ShutdownNotification notification, Object attachment) { 339 debug("ShutdownNotification"); 340 debug(" Association: " + notification.association()); 341 return HandlerResult.RETURN; 342 } 343 } 344 345 //--------------------- Infrastructure --------------------------- 346 boolean debug = true; 347 volatile int passed = 0, failed = 0; pass()348 void pass() {passed++;} fail()349 void fail() {failed++; Thread.dumpStack();} fail(String msg)350 void fail(String msg) {err.println(msg); fail();} unexpected(Throwable t)351 void unexpected(Throwable t) {failed++; t.printStackTrace();} check(boolean cond)352 void check(boolean cond) {if (cond) pass(); else fail();} check(boolean cond, String failMessage)353 void check(boolean cond, String failMessage) {if (cond) pass(); else fail(failMessage);} debug(String message)354 void debug(String message) {if(debug) { out.println(Thread.currentThread().getName() + ": " + message); } } sleep(long millis)355 void sleep(long millis) { try { Thread.currentThread().sleep(millis); } 356 catch(InterruptedException ie) { unexpected(ie); }} main(String[] args)357 public static void main(String[] args) throws Throwable { 358 Class<?> k = new Object(){}.getClass().getEnclosingClass(); 359 try {k.getMethod("instanceMain",String[].class) 360 .invoke( k.newInstance(), (Object) args);} 361 catch (Throwable e) {throw e.getCause();}} instanceMain(String[] args)362 public void instanceMain(String[] args) throws Throwable { 363 try {test(args);} catch (Throwable t) {unexpected(t);} 364 out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); 365 if (failed > 0) throw new AssertionError("Some tests failed");} 366 367 } 368