1 /* 2 * Copyright (c) 2007, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @bug 6450200 6450205 6450207 6450211 27 * @summary Test proper handling of tasks that terminate abruptly 28 * @author Martin Buchholz 29 */ 30 31 import java.security.Permission; 32 import java.util.Arrays; 33 import java.util.ArrayList; 34 import java.util.Collections; 35 import java.util.Hashtable; 36 import java.util.List; 37 import java.util.Map; 38 import java.util.Random; 39 import java.util.concurrent.ConcurrentHashMap; 40 import java.util.concurrent.CountDownLatch; 41 import java.util.concurrent.LinkedBlockingQueue; 42 import java.util.concurrent.ThreadFactory; 43 import java.util.concurrent.ThreadPoolExecutor; 44 import java.util.concurrent.TimeUnit; 45 import java.util.concurrent.atomic.AtomicInteger; 46 import java.util.concurrent.locks.ReentrantLock; 47 import java.util.function.Supplier; 48 49 public class ThrowingTasks { 50 static final Random rnd = new Random(); 51 52 @SuppressWarnings("serial") 53 static class UncaughtExceptions 54 extends ConcurrentHashMap<Class<?>, Integer> { 55 inc(Class<?> key)56 void inc(Class<?> key) { 57 compute(key, (k, v) -> (v == null) ? 1 : v + 1); 58 } 59 } 60 61 /** Double-check that HashTable and ConcurrentHashMap are work-alikes. */ 62 @SuppressWarnings("serial") 63 static class UncaughtExceptionsTable 64 extends Hashtable<Class<?>, Integer> { 65 inc(Class<?> key)66 synchronized void inc(Class<?> key) { 67 Integer v = get(key); 68 put(key, (v == null) ? 1 : v + 1); 69 } 70 } 71 72 static final UncaughtExceptions uncaughtExceptions 73 = new UncaughtExceptions(); 74 static final UncaughtExceptionsTable uncaughtExceptionsTable 75 = new UncaughtExceptionsTable(); 76 static final AtomicInteger totalUncaughtExceptions 77 = new AtomicInteger(0); 78 static final CountDownLatch uncaughtExceptionsLatch 79 = new CountDownLatch(24); 80 81 static final ThreadGroup tg = new ThreadGroup("Flaky"); 82 83 static final RuntimeException rte = new RuntimeException(); 84 static final Error error = new Error(); 85 static final Throwable weird = new Throwable(); 86 static final Exception checkedException = new Exception(); 87 88 static class Thrower implements Runnable { 89 final Throwable t; Thrower(Throwable t)90 Thrower(Throwable t) { this.t = t; } run()91 public void run() { 92 if (t != null) 93 ThrowingTasks.<RuntimeException>uncheckedThrow(t); 94 } 95 } 96 97 static final List<Thrower> throwers = Arrays.asList( 98 new Thrower(null), 99 new Thrower(rte), 100 new Thrower(error), 101 new Thrower(weird), 102 new Thrower(checkedException)); 103 104 static class Flaky implements Runnable { 105 final Runnable beforeExecute; 106 final Runnable execute; Flaky(Runnable beforeExecute, Runnable execute)107 Flaky(Runnable beforeExecute, 108 Runnable execute) { 109 this.beforeExecute = beforeExecute; 110 this.execute = execute; 111 } run()112 public void run() { execute.run(); } 113 } 114 115 static final List<Flaky> flakes = new ArrayList<>(); 116 static { 117 for (Thrower x : throwers) 118 for (Thrower y : throwers) flakes.add(new Flaky(x, y))119 flakes.add(new Flaky(x, y)); 120 Collections.shuffle(flakes); 121 } 122 123 static final CountDownLatch allStarted = new CountDownLatch(flakes.size()); 124 static final CountDownLatch allContinue = new CountDownLatch(1); 125 126 static class PermissiveSecurityManger extends SecurityManager { checkPermission(Permission p)127 public void checkPermission(Permission p) { /* bien sur, Monsieur */ } 128 } 129 checkTerminated(ThreadPoolExecutor tpe)130 static void checkTerminated(ThreadPoolExecutor tpe) { 131 try { 132 check(tpe.getQueue().isEmpty()); 133 check(tpe.isShutdown()); 134 check(tpe.isTerminated()); 135 check(! tpe.isTerminating()); 136 equal(tpe.getActiveCount(), 0); 137 equal(tpe.getPoolSize(), 0); 138 equal(tpe.getTaskCount(), tpe.getCompletedTaskCount()); 139 check(tpe.awaitTermination(0L, TimeUnit.SECONDS)); 140 } catch (Throwable t) { unexpected(t); } 141 } 142 143 /** 144 * Waits for condition to become true, first spin-polling, then sleep-polling. 145 */ spinAwait(Supplier<Boolean> waitingForGodot)146 static void spinAwait(Supplier<Boolean> waitingForGodot) { 147 for (int spins = 0; !waitingForGodot.get(); ) { 148 if ((spins = (spins + 1) & 3) > 0) { 149 Thread.yield(); 150 } else { 151 try { Thread.sleep(4); } 152 catch (InterruptedException unexpected) { 153 throw new AssertionError(unexpected); 154 } 155 } 156 } 157 } 158 159 static class CheckingExecutor extends ThreadPoolExecutor { 160 private final ReentrantLock lock = new ReentrantLock(); CheckingExecutor()161 CheckingExecutor() { 162 super(10, 10, 163 1L, TimeUnit.HOURS, 164 new LinkedBlockingQueue<Runnable>(), 165 (ThreadFactory) (runnable) -> { 166 Thread thread = new Thread(tg, runnable); 167 Thread.UncaughtExceptionHandler handler = (t, e) -> { 168 check(! t.isInterrupted()); 169 totalUncaughtExceptions.getAndIncrement(); 170 uncaughtExceptions.inc(e.getClass()); 171 uncaughtExceptionsTable.inc(e.getClass()); 172 uncaughtExceptionsLatch.countDown(); 173 }; 174 thread.setUncaughtExceptionHandler(handler); 175 return thread; 176 }); 177 } beforeExecute(Thread t, Runnable r)178 @Override protected void beforeExecute(Thread t, Runnable r) { 179 final boolean lessThanCorePoolSize; 180 // Add a lock to sync allStarted.countDown() and 181 // allStarted.getCount() < getCorePoolSize() 182 lock.lock(); 183 try { 184 allStarted.countDown(); 185 lessThanCorePoolSize = allStarted.getCount() < getCorePoolSize(); 186 } finally { 187 lock.unlock(); 188 } 189 if (lessThanCorePoolSize) { 190 try { allContinue.await(); } 191 catch (InterruptedException x) { unexpected(x); } 192 } 193 beforeExecuteCount.getAndIncrement(); 194 check(! isTerminated()); 195 ((Flaky)r).beforeExecute.run(); 196 } 197 @Override protected void afterExecute(Runnable r, Throwable t) { 198 //System.out.println(tg.activeCount()); 199 afterExecuteCount.getAndIncrement(); 200 check(((Thrower)((Flaky)r).execute).t == t); 201 check(! isTerminated()); 202 } 203 @Override protected void terminated() { 204 try { 205 terminatedCount.getAndIncrement(); 206 if (rnd.nextBoolean()) { 207 check(isShutdown()); 208 check(isTerminating()); 209 check(! isTerminated()); 210 check(! awaitTermination(0L, TimeUnit.MINUTES)); 211 } 212 } catch (Throwable t) { unexpected(t); } 213 } 214 } 215 216 static final AtomicInteger beforeExecuteCount = new AtomicInteger(0); 217 static final AtomicInteger afterExecuteCount = new AtomicInteger(0); 218 static final AtomicInteger terminatedCount = new AtomicInteger(0); 219 220 private static void realMain(String[] args) throws Throwable { 221 if (rnd.nextBoolean()) 222 System.setSecurityManager(new PermissiveSecurityManger()); 223 224 CheckingExecutor tpe = new CheckingExecutor(); 225 226 for (Runnable task : flakes) 227 tpe.execute(task); 228 229 if (rnd.nextBoolean()) { 230 allStarted.await(); 231 equal(tpe.getTaskCount(), 232 (long) flakes.size()); 233 equal(tpe.getCompletedTaskCount(), 234 (long) flakes.size() - tpe.getCorePoolSize()); 235 } 236 allContinue.countDown(); 237 238 //System.out.printf("thread count = %d%n", tg.activeCount()); 239 uncaughtExceptionsLatch.await(); 240 241 spinAwait(() -> tg.activeCount() == tpe.getCorePoolSize()); 242 243 tpe.shutdown(); 244 245 check(tpe.awaitTermination(10L, TimeUnit.MINUTES)); 246 checkTerminated(tpe); 247 248 List<Map<Class<?>, Integer>> maps = new ArrayList<>(); 249 maps.add(uncaughtExceptions); 250 maps.add(uncaughtExceptionsTable); 251 for (Map<Class<?>, Integer> map : maps) { 252 equal(map.get(Exception.class), throwers.size() + 1); 253 equal(map.get(weird.getClass()), throwers.size() + 1); 254 equal(map.get(Error.class), throwers.size() + 1); 255 equal(map.get(RuntimeException.class), throwers.size() + 1); 256 equal(map.size(), 4); 257 } 258 equal(totalUncaughtExceptions.get(), 4*throwers.size() + 4); 259 260 equal(beforeExecuteCount.get(), flakes.size()); 261 equal(afterExecuteCount.get(), throwers.size()); 262 equal(tpe.getCompletedTaskCount(), (long) flakes.size()); 263 equal(terminatedCount.get(), 1); 264 265 // check for termination operation idempotence 266 tpe.shutdown(); 267 tpe.shutdownNow(); 268 check(tpe.awaitTermination(10L, TimeUnit.MINUTES)); 269 checkTerminated(tpe); 270 equal(terminatedCount.get(), 1); 271 } 272 273 //--------------------- Infrastructure --------------------------- 274 static volatile int passed = 0, failed = 0; pass()275 static void pass() {passed++;} fail()276 static void fail() {failed++; Thread.dumpStack();} fail(String msg)277 static void fail(String msg) {System.out.println(msg); fail();} unexpected(Throwable t)278 static void unexpected(Throwable t) {failed++; t.printStackTrace();} check(boolean cond)279 static void check(boolean cond) {if (cond) pass(); else fail();} equal(Object x, Object y)280 static void equal(Object x, Object y) { 281 if (x == null ? y == null : x.equals(y)) pass(); 282 else fail(x + " not equal to " + y);} main(String[] args)283 public static void main(String[] args) throws Throwable { 284 try {realMain(args);} catch (Throwable t) {unexpected(t);} 285 System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); 286 if (failed > 0) throw new AssertionError("Some tests failed");} 287 @SuppressWarnings("unchecked") uncheckedThrow(Throwable t)288 static <T extends Throwable> void uncheckedThrow(Throwable t) throws T { 289 throw (T)t; // rely on vacuous cast 290 } 291 } 292