1 /* 2 * Copyright (c) 2002, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 4460583 4470470 4840199 6419424 6710579 6596323 6824135 6395224 7142919 26 * 8151582 8068693 8153209 27 * @run main/othervm AsyncCloseAndInterrupt 28 * @key intermittent 29 * @summary Comprehensive test of asynchronous closing and interruption 30 * @author Mark Reinhold 31 */ 32 33 import java.io.*; 34 import java.net.*; 35 import java.nio.channels.*; 36 import java.nio.ByteBuffer; 37 import java.util.ArrayList; 38 import java.util.List; 39 import java.util.concurrent.ExecutorService; 40 import java.util.concurrent.Executors; 41 import java.util.concurrent.ThreadFactory; 42 import java.util.concurrent.Callable; 43 import java.util.concurrent.Future; 44 import java.util.concurrent.TimeUnit; 45 46 public class AsyncCloseAndInterrupt { 47 48 static PrintStream log = System.err; 49 sleep(int ms)50 static void sleep(int ms) { 51 try { 52 Thread.sleep(ms); 53 } catch (InterruptedException x) { } 54 } 55 56 // Wildcard address localized to this machine -- Windoze doesn't allow 57 // connecting to a server socket that was previously bound to a true 58 // wildcard, namely new InetSocketAddress((InetAddress)null, 0). 59 // 60 private static InetSocketAddress wildcardAddress; 61 62 63 // Server socket that blindly accepts all connections 64 65 static ServerSocketChannel acceptor; 66 initAcceptor()67 private static void initAcceptor() throws IOException { 68 acceptor = ServerSocketChannel.open(); 69 acceptor.socket().bind(wildcardAddress); 70 71 Thread th = new Thread("Acceptor") { 72 public void run() { 73 try { 74 for (;;) { 75 SocketChannel sc = acceptor.accept(); 76 } 77 } catch (IOException x) { 78 x.printStackTrace(); 79 } 80 } 81 }; 82 83 th.setDaemon(true); 84 th.start(); 85 } 86 87 88 // Server socket that refuses all connections 89 90 static ServerSocketChannel refuser; 91 initRefuser()92 private static void initRefuser() throws IOException { 93 refuser = ServerSocketChannel.open(); 94 refuser.bind(wildcardAddress, 1); // use minimum backlog 95 } 96 97 // Dead pipe source and sink 98 99 static Pipe.SourceChannel deadSource; 100 static Pipe.SinkChannel deadSink; 101 initPipes()102 private static void initPipes() throws IOException { 103 if (deadSource != null) 104 deadSource.close(); 105 deadSource = Pipe.open().source(); 106 if (deadSink != null) 107 deadSink.close(); 108 deadSink = Pipe.open().sink(); 109 } 110 111 112 // Files 113 114 private static File fifoFile = null; // File that blocks on reads and writes 115 private static File diskFile = null; // Disk file 116 initFile()117 private static void initFile() throws Exception { 118 119 diskFile = File.createTempFile("aci", ".tmp"); 120 diskFile.deleteOnExit(); 121 FileChannel fc = new FileOutputStream(diskFile).getChannel(); 122 buffer.clear(); 123 if (fc.write(buffer) != buffer.capacity()) 124 throw new RuntimeException("Cannot create disk file"); 125 fc.close(); 126 127 if (TestUtil.onWindows()) { 128 log.println("WARNING: Cannot completely test FileChannels on Windows"); 129 return; 130 } 131 fifoFile = new File("x.fifo"); 132 if (fifoFile.exists()) { 133 if (!fifoFile.delete()) 134 throw new IOException("Cannot delete existing fifo " + fifoFile); 135 } 136 Process p = Runtime.getRuntime().exec("mkfifo " + fifoFile); 137 if (p.waitFor() != 0) 138 throw new IOException("Error creating fifo"); 139 new RandomAccessFile(fifoFile, "rw").close(); 140 141 } 142 143 144 // Channel factories 145 146 static abstract class ChannelFactory { 147 private final String name; ChannelFactory(String name)148 ChannelFactory(String name) { 149 this.name = name; 150 } toString()151 public String toString() { 152 return name; 153 } create()154 abstract InterruptibleChannel create() throws IOException; 155 } 156 157 static ChannelFactory socketChannelFactory 158 = new ChannelFactory("SocketChannel") { 159 InterruptibleChannel create() throws IOException { 160 return SocketChannel.open(); 161 } 162 }; 163 164 static ChannelFactory connectedSocketChannelFactory 165 = new ChannelFactory("SocketChannel") { 166 InterruptibleChannel create() throws IOException { 167 SocketAddress sa = acceptor.socket().getLocalSocketAddress(); 168 return SocketChannel.open(sa); 169 } 170 }; 171 172 static ChannelFactory serverSocketChannelFactory 173 = new ChannelFactory("ServerSocketChannel") { 174 InterruptibleChannel create() throws IOException { 175 ServerSocketChannel ssc = ServerSocketChannel.open(); 176 ssc.socket().bind(wildcardAddress); 177 return ssc; 178 } 179 }; 180 181 static ChannelFactory datagramChannelFactory 182 = new ChannelFactory("DatagramChannel") { 183 InterruptibleChannel create() throws IOException { 184 DatagramChannel dc = DatagramChannel.open(); 185 InetAddress lb = InetAddress.getByName("127.0.0.1"); 186 dc.bind(new InetSocketAddress(lb, 0)); 187 dc.connect(new InetSocketAddress(lb, 80)); 188 return dc; 189 } 190 }; 191 192 static ChannelFactory pipeSourceChannelFactory 193 = new ChannelFactory("Pipe.SourceChannel") { 194 InterruptibleChannel create() throws IOException { 195 // ## arrange to close sink 196 return Pipe.open().source(); 197 } 198 }; 199 200 static ChannelFactory pipeSinkChannelFactory 201 = new ChannelFactory("Pipe.SinkChannel") { 202 InterruptibleChannel create() throws IOException { 203 // ## arrange to close source 204 return Pipe.open().sink(); 205 } 206 }; 207 208 static ChannelFactory fifoFileChannelFactory 209 = new ChannelFactory("FileChannel") { 210 InterruptibleChannel create() throws IOException { 211 return new RandomAccessFile(fifoFile, "rw").getChannel(); 212 } 213 }; 214 215 static ChannelFactory diskFileChannelFactory 216 = new ChannelFactory("FileChannel") { 217 InterruptibleChannel create() throws IOException { 218 return new RandomAccessFile(diskFile, "rw").getChannel(); 219 } 220 }; 221 222 223 // I/O operations 224 225 static abstract class Op { 226 private final String name; Op(String name)227 protected Op(String name) { 228 this.name = name; 229 } doIO(InterruptibleChannel ich)230 abstract void doIO(InterruptibleChannel ich) throws IOException; setup()231 void setup() throws IOException { } toString()232 public String toString() { return name; } 233 } 234 235 static ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 20); 236 237 static ByteBuffer[] buffers = new ByteBuffer[] { 238 ByteBuffer.allocateDirect(1 << 19), 239 ByteBuffer.allocateDirect(1 << 19) 240 }; 241 clearBuffers()242 static void clearBuffers() { 243 buffers[0].clear(); 244 buffers[1].clear(); 245 } 246 show(Channel ch)247 static void show(Channel ch) { 248 log.print("Channel " + (ch.isOpen() ? "open" : "closed")); 249 if (ch.isOpen() && (ch instanceof SocketChannel)) { 250 SocketChannel sc = (SocketChannel)ch; 251 if (sc.socket().isInputShutdown()) 252 log.print(", input shutdown"); 253 if (sc.socket().isOutputShutdown()) 254 log.print(", output shutdown"); 255 } 256 log.println(); 257 } 258 259 static final Op READ = new Op("read") { 260 void doIO(InterruptibleChannel ich) throws IOException { 261 ReadableByteChannel rbc = (ReadableByteChannel)ich; 262 buffer.clear(); 263 int n = rbc.read(buffer); 264 log.println("Read returned " + n); 265 show(rbc); 266 if (rbc.isOpen() 267 && (n == -1) 268 && (rbc instanceof SocketChannel) 269 && ((SocketChannel)rbc).socket().isInputShutdown()) { 270 return; 271 } 272 throw new RuntimeException("Read succeeded"); 273 } 274 }; 275 276 static final Op READV = new Op("readv") { 277 void doIO(InterruptibleChannel ich) throws IOException { 278 ScatteringByteChannel sbc = (ScatteringByteChannel)ich; 279 clearBuffers(); 280 int n = (int)sbc.read(buffers); 281 log.println("Read returned " + n); 282 show(sbc); 283 if (sbc.isOpen() 284 && (n == -1) 285 && (sbc instanceof SocketChannel) 286 && ((SocketChannel)sbc).socket().isInputShutdown()) { 287 return; 288 } 289 throw new RuntimeException("Read succeeded"); 290 } 291 }; 292 293 static final Op RECEIVE = new Op("receive") { 294 void doIO(InterruptibleChannel ich) throws IOException { 295 DatagramChannel dc = (DatagramChannel)ich; 296 buffer.clear(); 297 dc.receive(buffer); 298 show(dc); 299 throw new RuntimeException("Read succeeded"); 300 } 301 }; 302 303 static final Op WRITE = new Op("write") { 304 void doIO(InterruptibleChannel ich) throws IOException { 305 306 WritableByteChannel wbc = (WritableByteChannel)ich; 307 308 SocketChannel sc = null; 309 if (wbc instanceof SocketChannel) 310 sc = (SocketChannel)wbc; 311 312 int n = 0; 313 for (;;) { 314 buffer.clear(); 315 int d = wbc.write(buffer); 316 n += d; 317 if (!wbc.isOpen()) 318 break; 319 if ((sc != null) && sc.socket().isOutputShutdown()) 320 break; 321 } 322 log.println("Wrote " + n + " bytes"); 323 show(wbc); 324 } 325 }; 326 327 static final Op WRITEV = new Op("writev") { 328 void doIO(InterruptibleChannel ich) throws IOException { 329 330 GatheringByteChannel gbc = (GatheringByteChannel)ich; 331 332 SocketChannel sc = null; 333 if (gbc instanceof SocketChannel) 334 sc = (SocketChannel)gbc; 335 336 int n = 0; 337 for (;;) { 338 clearBuffers(); 339 int d = (int)gbc.write(buffers); 340 n += d; 341 if (!gbc.isOpen()) 342 break; 343 if ((sc != null) && sc.socket().isOutputShutdown()) 344 break; 345 } 346 log.println("Wrote " + n + " bytes"); 347 show(gbc); 348 349 } 350 }; 351 352 static final Op CONNECT = new Op("connect") { 353 void setup() { 354 waitPump("connect waiting for pumping refuser ..."); 355 } 356 void doIO(InterruptibleChannel ich) throws IOException { 357 SocketChannel sc = (SocketChannel)ich; 358 if (sc.connect(refuser.socket().getLocalSocketAddress())) 359 throw new RuntimeException("Connection succeeded"); 360 throw new RuntimeException("Connection did not block"); 361 } 362 }; 363 364 static final Op FINISH_CONNECT = new Op("finishConnect") { 365 void setup() { 366 waitPump("finishConnect waiting for pumping refuser ..."); 367 } 368 void doIO(InterruptibleChannel ich) throws IOException { 369 SocketChannel sc = (SocketChannel)ich; 370 sc.configureBlocking(false); 371 SocketAddress sa = refuser.socket().getLocalSocketAddress(); 372 if (sc.connect(sa)) 373 throw new RuntimeException("Connection succeeded"); 374 sc.configureBlocking(true); 375 if (sc.finishConnect()) 376 throw new RuntimeException("Connection succeeded"); 377 throw new RuntimeException("Connection did not block"); 378 } 379 }; 380 381 static final Op ACCEPT = new Op("accept") { 382 void doIO(InterruptibleChannel ich) throws IOException { 383 ServerSocketChannel ssc = (ServerSocketChannel)ich; 384 ssc.accept(); 385 throw new RuntimeException("Accept succeeded"); 386 } 387 }; 388 389 // Use only with diskFileChannelFactory 390 static final Op TRANSFER_TO = new Op("transferTo") { 391 void doIO(InterruptibleChannel ich) throws IOException { 392 FileChannel fc = (FileChannel)ich; 393 long n = fc.transferTo(0, fc.size(), deadSink); 394 log.println("Transferred " + n + " bytes"); 395 show(fc); 396 } 397 }; 398 399 // Use only with diskFileChannelFactory 400 static final Op TRANSFER_FROM = new Op("transferFrom") { 401 void doIO(InterruptibleChannel ich) throws IOException { 402 FileChannel fc = (FileChannel)ich; 403 long n = fc.transferFrom(deadSource, 0, 1 << 20); 404 log.println("Transferred " + n + " bytes"); 405 show(fc); 406 } 407 }; 408 409 410 411 // Test modes 412 413 static final int TEST_PREINTR = 0; // Interrupt thread before I/O 414 static final int TEST_INTR = 1; // Interrupt thread during I/O 415 static final int TEST_CLOSE = 2; // Close channel during I/O 416 static final int TEST_SHUTI = 3; // Shutdown input during I/O 417 static final int TEST_SHUTO = 4; // Shutdown output during I/O 418 419 static final String[] testName = new String[] { 420 "pre-interrupt", "interrupt", "close", 421 "shutdown-input", "shutdown-output" 422 }; 423 424 425 static class Tester extends TestThread { 426 427 private InterruptibleChannel ch; 428 private Op op; 429 private int test; 430 volatile boolean ready = false; 431 Tester(ChannelFactory cf, InterruptibleChannel ch, Op op, int test)432 protected Tester(ChannelFactory cf, InterruptibleChannel ch, 433 Op op, int test) 434 { 435 super(cf + "/" + op + "/" + testName[test]); 436 this.ch = ch; 437 this.op = op; 438 this.test = test; 439 } 440 441 @SuppressWarnings("fallthrough") caught(Channel ch, IOException x)442 private void caught(Channel ch, IOException x) { 443 String xn = x.getClass().getName(); 444 switch (test) { 445 446 case TEST_PREINTR: 447 case TEST_INTR: 448 if (!xn.equals("java.nio.channels.ClosedByInterruptException")) 449 throw new RuntimeException("Wrong exception thrown: " + x); 450 break; 451 452 case TEST_CLOSE: 453 case TEST_SHUTO: 454 if (!xn.equals("java.nio.channels.AsynchronousCloseException")) 455 throw new RuntimeException("Wrong exception thrown: " + x); 456 break; 457 458 case TEST_SHUTI: 459 if (TestUtil.onWindows()) 460 break; 461 // FALL THROUGH 462 463 default: 464 throw new Error(x); 465 } 466 467 if (ch.isOpen()) { 468 if (test == TEST_SHUTO) { 469 SocketChannel sc = (SocketChannel)ch; 470 if (!sc.socket().isOutputShutdown()) 471 throw new RuntimeException("Output not shutdown"); 472 } else if ((test == TEST_INTR) && (op == TRANSFER_FROM)) { 473 // Let this case pass -- CBIE applies to other channel 474 } else { 475 throw new RuntimeException("Channel still open"); 476 } 477 } 478 479 log.println("Thrown as expected: " + x); 480 } 481 go()482 final void go() throws Exception { 483 if (test == TEST_PREINTR) 484 Thread.currentThread().interrupt(); 485 ready = true; 486 try { 487 op.doIO(ch); 488 } catch (ClosedByInterruptException x) { 489 caught(ch, x); 490 } catch (AsynchronousCloseException x) { 491 caught(ch, x); 492 } finally { 493 ch.close(); 494 } 495 } 496 497 } 498 499 private static volatile boolean pumpDone = false; 500 private static volatile boolean pumpReady = false; 501 waitPump(String msg)502 private static void waitPump(String msg){ 503 log.println(msg); 504 while (!pumpReady){ 505 sleep(200); 506 } 507 log.println(msg + " done"); 508 } 509 510 // Create a pump thread dedicated to saturate refuser's connection backlog pumpRefuser(ExecutorService pumperExecutor)511 private static Future<Integer> pumpRefuser(ExecutorService pumperExecutor) { 512 513 Callable<Integer> pumpTask = new Callable<Integer>() { 514 515 @Override 516 public Integer call() throws IOException { 517 // Can't reliably saturate connection backlog on Windows Server editions 518 assert !TestUtil.onWindows(); 519 log.println("Start pumping refuser ..."); 520 List<SocketChannel> refuserClients = new ArrayList<>(); 521 522 // Saturate the refuser's connection backlog so that further connection 523 // attempts will be blocked 524 pumpReady = false; 525 while (!pumpDone) { 526 SocketChannel sc = SocketChannel.open(); 527 sc.configureBlocking(false); 528 boolean connected = sc.connect(refuser.socket().getLocalSocketAddress()); 529 530 // Assume that the connection backlog is saturated if a 531 // client cannot connect to the refuser within 50 milliseconds 532 long start = System.currentTimeMillis(); 533 while (!pumpReady && !connected 534 && (System.currentTimeMillis() - start < 50)) { 535 connected = sc.finishConnect(); 536 } 537 538 if (connected) { 539 // Retain so that finalizer doesn't close 540 refuserClients.add(sc); 541 } else { 542 sc.close(); 543 pumpReady = true; 544 } 545 } 546 547 for (SocketChannel sc : refuserClients) { 548 sc.close(); 549 } 550 refuser.close(); 551 552 log.println("Stop pumping refuser ..."); 553 return refuserClients.size(); 554 } 555 }; 556 557 return pumperExecutor.submit(pumpTask); 558 } 559 560 // Test test(ChannelFactory cf, Op op, int test)561 static void test(ChannelFactory cf, Op op, int test) throws Exception { 562 test(cf, op, test, true); 563 } 564 test(ChannelFactory cf, Op op, int test, boolean extraSleep)565 static void test(ChannelFactory cf, Op op, int test, boolean extraSleep) 566 throws Exception 567 { 568 log.println(); 569 initPipes(); 570 InterruptibleChannel ch = cf.create(); 571 Tester t = new Tester(cf, ch, op, test); 572 log.println(t); 573 op.setup(); 574 t.start(); 575 do { 576 sleep(50); 577 } while (!t.ready); 578 579 if (extraSleep) { 580 sleep(100); 581 } 582 583 switch (test) { 584 585 case TEST_INTR: 586 t.interrupt(); 587 break; 588 589 case TEST_CLOSE: 590 ch.close(); 591 break; 592 593 case TEST_SHUTI: 594 if (TestUtil.onWindows()) { 595 log.println("WARNING: Asynchronous shutdown not working on Windows"); 596 ch.close(); 597 } else { 598 ((SocketChannel)ch).socket().shutdownInput(); 599 } 600 break; 601 602 case TEST_SHUTO: 603 if (TestUtil.onWindows()) { 604 log.println("WARNING: Asynchronous shutdown not working on Windows"); 605 ch.close(); 606 } else { 607 ((SocketChannel)ch).socket().shutdownOutput(); 608 } 609 break; 610 611 default: 612 break; 613 } 614 615 t.finishAndThrow(10000); 616 } 617 test(ChannelFactory cf, Op op)618 static void test(ChannelFactory cf, Op op) throws Exception { 619 test(cf, op, true); 620 } 621 test(ChannelFactory cf, Op op, boolean extraSleep)622 static void test(ChannelFactory cf, Op op, boolean extraSleep) throws Exception { 623 // Test INTR cases before PREINTER cases since sometimes 624 // interrupted threads can't load classes 625 test(cf, op, TEST_INTR, extraSleep); 626 test(cf, op, TEST_PREINTR, extraSleep); 627 628 // Bugs, see FileChannelImpl for details 629 if (op == TRANSFER_FROM) { 630 log.println("WARNING: transferFrom/close not tested"); 631 return; 632 } 633 if ((op == TRANSFER_TO) && !TestUtil.onWindows()) { 634 log.println("WARNING: transferTo/close not tested"); 635 return; 636 } 637 638 test(cf, op, TEST_CLOSE, extraSleep); 639 } 640 test(ChannelFactory cf)641 static void test(ChannelFactory cf) 642 throws Exception 643 { 644 InterruptibleChannel ch = cf.create(); // Sample channel 645 ch.close(); 646 647 if (ch instanceof ReadableByteChannel) { 648 test(cf, READ); 649 if (ch instanceof SocketChannel) 650 test(cf, READ, TEST_SHUTI); 651 } 652 653 if (ch instanceof ScatteringByteChannel) { 654 test(cf, READV); 655 if (ch instanceof SocketChannel) 656 test(cf, READV, TEST_SHUTI); 657 } 658 659 if (ch instanceof DatagramChannel) { 660 test(cf, RECEIVE); 661 662 // Return here: We can't effectively test writes since, if they 663 // block, they do so only for a fleeting moment unless the network 664 // interface is overloaded. 665 return; 666 667 } 668 669 if (ch instanceof WritableByteChannel) { 670 test(cf, WRITE); 671 if (ch instanceof SocketChannel) 672 test(cf, WRITE, TEST_SHUTO); 673 } 674 675 if (ch instanceof GatheringByteChannel) { 676 test(cf, WRITEV); 677 if (ch instanceof SocketChannel) 678 test(cf, WRITEV, TEST_SHUTO); 679 } 680 681 } 682 main(String[] args)683 public static void main(String[] args) throws Exception { 684 685 wildcardAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0); 686 initAcceptor(); 687 if (!TestUtil.onWindows()) 688 initRefuser(); 689 initPipes(); 690 initFile(); 691 692 if (TestUtil.onWindows()) { 693 log.println("WARNING: Cannot test FileChannel transfer operations" 694 + " on Windows"); 695 } else { 696 test(diskFileChannelFactory, TRANSFER_TO); 697 test(diskFileChannelFactory, TRANSFER_FROM); 698 } 699 if (fifoFile != null) 700 test(fifoFileChannelFactory); 701 702 // Testing positional file reads and writes is impractical: It requires 703 // access to a large file soft-mounted via NFS, and even then isn't 704 // completely guaranteed to work. 705 // 706 // Testing map is impractical and arguably unnecessary: It's 707 // unclear under what conditions mmap(2) will actually block. 708 709 test(connectedSocketChannelFactory); 710 711 if (TestUtil.onWindows() || TestUtil.onSolaris() || TestUtil.onBSD()) { 712 log.println("WARNING Cannot reliably test connect/finishConnect" 713 + " operations on this platform"); 714 } else { 715 // Only the following tests need refuser's connection backlog 716 // to be saturated 717 ExecutorService pumperExecutor = 718 Executors.newSingleThreadExecutor( 719 new ThreadFactory() { 720 721 @Override 722 public Thread newThread(Runnable r) { 723 Thread t = new Thread(r); 724 t.setDaemon(true); 725 t.setName("Pumper"); 726 return t; 727 } 728 }); 729 730 pumpDone = false; 731 try { 732 Future<Integer> pumpFuture = pumpRefuser(pumperExecutor); 733 waitPump("\nWait for initial Pump"); 734 735 test(socketChannelFactory, CONNECT, false); 736 test(socketChannelFactory, FINISH_CONNECT, false); 737 738 pumpDone = true; 739 Integer newConn = pumpFuture.get(30, TimeUnit.SECONDS); 740 log.println("Pump " + newConn + " connections."); 741 } finally { 742 pumperExecutor.shutdown(); 743 } 744 } 745 746 test(serverSocketChannelFactory, ACCEPT); 747 test(datagramChannelFactory); 748 test(pipeSourceChannelFactory); 749 test(pipeSinkChannelFactory); 750 } 751 } 752