1 /* 2 * Copyright (c) 2008, 2021, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.Channel; 29 import java.nio.channels.AsynchronousChannelGroup; 30 import java.nio.channels.spi.AsynchronousChannelProvider; 31 import java.io.IOException; 32 import java.io.FileDescriptor; 33 import java.util.Queue; 34 import java.util.concurrent.*; 35 import java.util.concurrent.atomic.AtomicInteger; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 import java.security.PrivilegedAction; 38 import java.security.AccessController; 39 import java.security.AccessControlContext; 40 import sun.security.action.GetIntegerAction; 41 42 /** 43 * Base implementation of AsynchronousChannelGroup 44 */ 45 46 abstract class AsynchronousChannelGroupImpl 47 extends AsynchronousChannelGroup implements Executor 48 { 49 // number of internal threads handling I/O events when using an unbounded 50 // thread pool. Internal threads do not dispatch to completion handlers. 51 @SuppressWarnings("removal") 52 private static final int internalThreadCount = AccessController.doPrivileged( 53 new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1)); 54 55 // associated thread pool 56 private final ThreadPool pool; 57 58 // number of tasks running (including internal) 59 private final AtomicInteger threadCount = new AtomicInteger(); 60 61 // associated Executor for timeouts 62 private ScheduledThreadPoolExecutor timeoutExecutor; 63 64 // task queue for when using a fixed thread pool. In that case, a thread 65 // waiting on I/O events must be awoken to poll tasks from this queue. 66 private final Queue<Runnable> taskQueue; 67 68 // group shutdown 69 private final AtomicBoolean shutdown = new AtomicBoolean(); 70 private final Object shutdownNowLock = new Object(); 71 private volatile boolean terminateInitiated; 72 AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider, ThreadPool pool)73 AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider, 74 ThreadPool pool) 75 { 76 super(provider); 77 this.pool = pool; 78 79 if (pool.isFixedThreadPool()) { 80 taskQueue = new ConcurrentLinkedQueue<>(); 81 } else { 82 taskQueue = null; // not used 83 } 84 85 // use default thread factory as thread should not be visible to 86 // application (it doesn't execute completion handlers). 87 this.timeoutExecutor = (ScheduledThreadPoolExecutor) 88 Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory()); 89 this.timeoutExecutor.setRemoveOnCancelPolicy(true); 90 } 91 executor()92 final ExecutorService executor() { 93 return pool.executor(); 94 } 95 isFixedThreadPool()96 final boolean isFixedThreadPool() { 97 return pool.isFixedThreadPool(); 98 } 99 fixedThreadCount()100 final int fixedThreadCount() { 101 if (isFixedThreadPool()) { 102 return pool.poolSize(); 103 } else { 104 return pool.poolSize() + internalThreadCount; 105 } 106 } 107 bindToGroup(final Runnable task)108 private Runnable bindToGroup(final Runnable task) { 109 final AsynchronousChannelGroupImpl thisGroup = this; 110 return new Runnable() { 111 public void run() { 112 Invoker.bindToGroup(thisGroup); 113 task.run(); 114 } 115 }; 116 } 117 118 @SuppressWarnings("removal") 119 private void startInternalThread(final Runnable task) { 120 AccessController.doPrivileged(new PrivilegedAction<>() { 121 @Override 122 public Void run() { 123 // internal threads should not be visible to application so 124 // cannot use user-supplied thread factory 125 ThreadPool.defaultThreadFactory().newThread(task).start(); 126 return null; 127 } 128 }); 129 } 130 131 protected final void startThreads(Runnable task) { 132 if (!isFixedThreadPool()) { 133 for (int i=0; i<internalThreadCount; i++) { 134 startInternalThread(task); 135 threadCount.incrementAndGet(); 136 } 137 } 138 if (pool.poolSize() > 0) { 139 task = bindToGroup(task); 140 try { 141 for (int i=0; i<pool.poolSize(); i++) { 142 pool.executor().execute(task); 143 threadCount.incrementAndGet(); 144 } 145 } catch (RejectedExecutionException x) { 146 // nothing we can do 147 } 148 } 149 } 150 151 final int threadCount() { 152 return threadCount.get(); 153 } 154 155 /** 156 * Invoked by tasks as they terminate 157 */ 158 final int threadExit(Runnable task, boolean replaceMe) { 159 if (replaceMe) { 160 try { 161 if (Invoker.isBoundToAnyGroup()) { 162 // submit new task to replace this thread 163 pool.executor().execute(bindToGroup(task)); 164 } else { 165 // replace internal thread 166 startInternalThread(task); 167 } 168 return threadCount.get(); 169 } catch (RejectedExecutionException x) { 170 // unable to replace 171 } 172 } 173 return threadCount.decrementAndGet(); 174 } 175 176 /** 177 * Wakes up a thread waiting for I/O events to execute the given task. 178 */ 179 abstract void executeOnHandlerTask(Runnable task); 180 181 /** 182 * For a fixed thread pool the task is queued to a thread waiting on I/O 183 * events. For other thread pools we simply submit the task to the thread 184 * pool. 185 */ 186 final void executeOnPooledThread(Runnable task) { 187 if (isFixedThreadPool()) { 188 executeOnHandlerTask(task); 189 } else { 190 pool.executor().execute(bindToGroup(task)); 191 } 192 } 193 194 final void offerTask(Runnable task) { 195 taskQueue.offer(task); 196 } 197 198 final Runnable pollTask() { 199 return (taskQueue == null) ? null : taskQueue.poll(); 200 } 201 202 final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) { 203 try { 204 return timeoutExecutor.schedule(task, timeout, unit); 205 } catch (RejectedExecutionException rej) { 206 if (terminateInitiated) { 207 // no timeout scheduled as group is terminating 208 return null; 209 } 210 throw new AssertionError(rej); 211 } 212 } 213 214 @Override 215 public final boolean isShutdown() { 216 return shutdown.get(); 217 } 218 219 @Override 220 public final boolean isTerminated() { 221 return pool.executor().isTerminated(); 222 } 223 224 /** 225 * Returns true if there are no channels in the group 226 */ 227 abstract boolean isEmpty(); 228 229 /** 230 * Attaches a foreign channel to this group. 231 */ 232 abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo) 233 throws IOException; 234 235 /** 236 * Detaches a foreign channel from this group. 237 */ 238 abstract void detachForeignChannel(Object key); 239 240 /** 241 * Closes all channels in the group 242 */ 243 abstract void closeAllChannels() throws IOException; 244 245 /** 246 * Shutdown all tasks waiting for I/O events. 247 */ 248 abstract void shutdownHandlerTasks(); 249 250 @SuppressWarnings("removal") 251 private void shutdownExecutors() { 252 AccessController.doPrivileged( 253 new PrivilegedAction<>() { 254 public Void run() { 255 pool.executor().shutdown(); 256 timeoutExecutor.shutdown(); 257 return null; 258 } 259 }, 260 null, 261 new RuntimePermission("modifyThread")); 262 } 263 264 @Override 265 public final void shutdown() { 266 if (shutdown.getAndSet(true)) { 267 // already shutdown 268 return; 269 } 270 // if there are channels in the group then shutdown will continue 271 // when the last channel is closed 272 if (!isEmpty()) { 273 return; 274 } 275 // initiate termination (acquire shutdownNowLock to ensure that other 276 // threads invoking shutdownNow will block). 277 synchronized (shutdownNowLock) { 278 if (!terminateInitiated) { 279 terminateInitiated = true; 280 shutdownHandlerTasks(); 281 shutdownExecutors(); 282 } 283 } 284 } 285 286 @Override 287 public final void shutdownNow() throws IOException { 288 shutdown.set(true); 289 synchronized (shutdownNowLock) { 290 if (!terminateInitiated) { 291 terminateInitiated = true; 292 closeAllChannels(); 293 shutdownHandlerTasks(); 294 shutdownExecutors(); 295 } 296 } 297 } 298 299 /** 300 * For use by AsynchronousFileChannel to release resources without shutting 301 * down the thread pool. 302 */ 303 final void detachFromThreadPool() { 304 if (shutdown.getAndSet(true)) 305 throw new AssertionError("Already shutdown"); 306 if (!isEmpty()) 307 throw new AssertionError("Group not empty"); 308 shutdownHandlerTasks(); 309 } 310 311 @Override 312 public final boolean awaitTermination(long timeout, TimeUnit unit) 313 throws InterruptedException 314 { 315 return pool.executor().awaitTermination(timeout, unit); 316 } 317 318 /** 319 * Executes the given command on one of the channel group's pooled threads. 320 */ 321 @Override 322 public final void execute(Runnable task) { 323 @SuppressWarnings("removal") 324 SecurityManager sm = System.getSecurityManager(); 325 if (sm != null) { 326 // when a security manager is installed then the user's task 327 // must be run with the current calling context 328 @SuppressWarnings("removal") 329 final AccessControlContext acc = AccessController.getContext(); 330 final Runnable delegate = task; 331 task = new Runnable() { 332 @SuppressWarnings("removal") 333 @Override 334 public void run() { 335 AccessController.doPrivileged(new PrivilegedAction<>() { 336 @Override 337 public Void run() { 338 delegate.run(); 339 return null; 340 } 341 }, acc); 342 } 343 }; 344 } 345 executeOnPooledThread(task); 346 } 347 } 348