1 /*
2  * Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /* @test
25  * @bug 4607272 6842687
26  * @summary Unit test for AsynchronousChannelGroup
27  * @key randomness
28  */
29 
30 import java.nio.channels.*;
31 import java.net.*;
32 import java.util.*;
33 import java.util.concurrent.*;
34 import java.util.concurrent.atomic.*;
35 import java.io.IOException;
36 
37 /**
38  * Exercise replacement of threads in the thread pool when completion handlers
39  * terminate due to errors or runtime exceptions.
40  */
41 
42 public class Restart {
43     static final Random rand = new Random();
44 
main(String[] args)45     public static void main(String[] args) throws Exception {
46         // thread group for thread pools
47         final ThreadGroup tg = new ThreadGroup("test");
48 
49         // keep track of the number of threads that terminate
50         final AtomicInteger exceptionCount = new AtomicInteger(0);
51         final Thread.UncaughtExceptionHandler ueh =
52             new Thread.UncaughtExceptionHandler() {
53                 public void uncaughtException(Thread t, Throwable e) {
54                     exceptionCount.incrementAndGet();
55                 }
56             };
57         ThreadFactory factory = new ThreadFactory() {
58             @Override
59             public Thread newThread(Runnable r) {
60                 Thread t = new Thread(tg, r);
61                 t.setUncaughtExceptionHandler(ueh);
62                 return t;
63             }
64         };
65 
66         // group with fixed thread pool
67         int nThreads = 1 + rand.nextInt(4);
68         AsynchronousChannelGroup group =
69                 AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);
70         try {
71             testRestart(group, 100);
72         } finally {
73             group.shutdown();
74         }
75 
76         // group with cached thread pool
77         ExecutorService pool = Executors.newCachedThreadPool(factory);
78         group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
79         try {
80             testRestart(group, 100);
81         } finally {
82             group.shutdown();
83         }
84 
85         // group with custom thread pool
86         group = AsynchronousChannelGroup.withThreadPool(
87                 Executors.newFixedThreadPool(1+rand.nextInt(5), factory));
88         try {
89             testRestart(group, 100);
90         } finally {
91             group.shutdown();
92         }
93 
94         // give time for threads to terminate
95         Thread.sleep(3000);
96         int actual = exceptionCount.get();
97         if (actual != 300)
98             throw new RuntimeException(actual + " exceptions, expected: " + 300);
99     }
100 
testRestart(AsynchronousChannelGroup group, int count)101     static void testRestart(AsynchronousChannelGroup group, int count)
102         throws Exception
103     {
104         try (AsynchronousServerSocketChannel listener =
105                 AsynchronousServerSocketChannel.open(group)) {
106 
107             listener.bind(new InetSocketAddress(0));
108             for (int i=0; i<count; i++) {
109                 final CountDownLatch latch = new CountDownLatch(1);
110 
111                 listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
112                     public void completed(AsynchronousSocketChannel ch, Void att) {
113                         try {
114                             ch.close();
115                         } catch (IOException ignore) { }
116 
117                         latch.countDown();
118 
119                         // throw error or runtime exception
120                         if (rand.nextBoolean()) {
121                             throw new Error();
122                         } else {
123                             throw new RuntimeException();
124                         }
125                     }
126                     public void failed(Throwable exc, Void att) {
127                     }
128                 });
129 
130                 // establish loopback connection which should cause completion
131                 // handler to be invoked.
132                 int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
133                 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
134                     InetAddress lh = InetAddress.getLocalHost();
135                     ch.connect(new InetSocketAddress(lh, port)).get();
136                 }
137 
138                 // wait for handler to be invoked
139                 latch.await();
140             }
141         }
142     }
143 }
144