1 /*
2  * Copyright (c) 2008, 2021, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package sun.nio.ch;
27 
28 import java.nio.channels.Channel;
29 import java.nio.channels.AsynchronousChannelGroup;
30 import java.nio.channels.spi.AsynchronousChannelProvider;
31 import java.io.IOException;
32 import java.io.FileDescriptor;
33 import java.util.Queue;
34 import java.util.concurrent.*;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.security.PrivilegedAction;
38 import java.security.AccessController;
39 import java.security.AccessControlContext;
40 import sun.security.action.GetIntegerAction;
41 
42 /**
43  * Base implementation of AsynchronousChannelGroup
44  */
45 
46 abstract class AsynchronousChannelGroupImpl
47     extends AsynchronousChannelGroup implements Executor
48 {
49     // number of internal threads handling I/O events when using an unbounded
50     // thread pool. Internal threads do not dispatch to completion handlers.
51     @SuppressWarnings("removal")
52     private static final int internalThreadCount = AccessController.doPrivileged(
53         new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1));
54 
55     // associated thread pool
56     private final ThreadPool pool;
57 
58     // number of tasks running (including internal)
59     private final AtomicInteger threadCount = new AtomicInteger();
60 
61     // associated Executor for timeouts
62     private ScheduledThreadPoolExecutor timeoutExecutor;
63 
64     // task queue for when using a fixed thread pool. In that case, a thread
65     // waiting on I/O events must be awoken to poll tasks from this queue.
66     private final Queue<Runnable> taskQueue;
67 
68     // group shutdown
69     private final AtomicBoolean shutdown = new AtomicBoolean();
70     private final Object shutdownNowLock = new Object();
71     private volatile boolean terminateInitiated;
72 
AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider, ThreadPool pool)73     AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
74                                  ThreadPool pool)
75     {
76         super(provider);
77         this.pool = pool;
78 
79         if (pool.isFixedThreadPool()) {
80             taskQueue = new ConcurrentLinkedQueue<>();
81         } else {
82             taskQueue = null;   // not used
83         }
84 
85         // use default thread factory as thread should not be visible to
86         // application (it doesn't execute completion handlers).
87         this.timeoutExecutor = (ScheduledThreadPoolExecutor)
88             Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());
89         this.timeoutExecutor.setRemoveOnCancelPolicy(true);
90     }
91 
executor()92     final ExecutorService executor() {
93         return pool.executor();
94     }
95 
isFixedThreadPool()96     final boolean isFixedThreadPool() {
97         return pool.isFixedThreadPool();
98     }
99 
fixedThreadCount()100     final int fixedThreadCount() {
101         if (isFixedThreadPool()) {
102             return pool.poolSize();
103         } else {
104             return pool.poolSize() + internalThreadCount;
105         }
106     }
107 
bindToGroup(final Runnable task)108     private Runnable bindToGroup(final Runnable task) {
109         final AsynchronousChannelGroupImpl thisGroup = this;
110         return new Runnable() {
111             public void run() {
112                 Invoker.bindToGroup(thisGroup);
113                 task.run();
114             }
115         };
116     }
117 
118     @SuppressWarnings("removal")
119     private void startInternalThread(final Runnable task) {
120         AccessController.doPrivileged(new PrivilegedAction<>() {
121             @Override
122             public Void run() {
123                 // internal threads should not be visible to application so
124                 // cannot use user-supplied thread factory
125                 ThreadPool.defaultThreadFactory().newThread(task).start();
126                 return null;
127             }
128          });
129     }
130 
131     protected final void startThreads(Runnable task) {
132         if (!isFixedThreadPool()) {
133             for (int i=0; i<internalThreadCount; i++) {
134                 startInternalThread(task);
135                 threadCount.incrementAndGet();
136             }
137         }
138         if (pool.poolSize() > 0) {
139             task = bindToGroup(task);
140             try {
141                 for (int i=0; i<pool.poolSize(); i++) {
142                     pool.executor().execute(task);
143                     threadCount.incrementAndGet();
144                 }
145             } catch (RejectedExecutionException  x) {
146                 // nothing we can do
147             }
148         }
149     }
150 
151     final int threadCount() {
152         return threadCount.get();
153     }
154 
155     /**
156      * Invoked by tasks as they terminate
157      */
158     final int threadExit(Runnable task, boolean replaceMe) {
159         if (replaceMe) {
160             try {
161                 if (Invoker.isBoundToAnyGroup()) {
162                     // submit new task to replace this thread
163                     pool.executor().execute(bindToGroup(task));
164                 } else {
165                     // replace internal thread
166                     startInternalThread(task);
167                 }
168                 return threadCount.get();
169             } catch (RejectedExecutionException x) {
170                 // unable to replace
171             }
172         }
173         return threadCount.decrementAndGet();
174     }
175 
176     /**
177      * Wakes up a thread waiting for I/O events to execute the given task.
178      */
179     abstract void executeOnHandlerTask(Runnable task);
180 
181     /**
182      * For a fixed thread pool the task is queued to a thread waiting on I/O
183      * events. For other thread pools we simply submit the task to the thread
184      * pool.
185      */
186     final void executeOnPooledThread(Runnable task) {
187         if (isFixedThreadPool()) {
188             executeOnHandlerTask(task);
189         } else {
190             pool.executor().execute(bindToGroup(task));
191         }
192     }
193 
194     final void offerTask(Runnable task) {
195         taskQueue.offer(task);
196     }
197 
198     final Runnable pollTask() {
199         return (taskQueue == null) ? null : taskQueue.poll();
200     }
201 
202     final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) {
203         try {
204             return timeoutExecutor.schedule(task, timeout, unit);
205         } catch (RejectedExecutionException rej) {
206             if (terminateInitiated) {
207                 // no timeout scheduled as group is terminating
208                 return null;
209             }
210             throw new AssertionError(rej);
211         }
212     }
213 
214     @Override
215     public final boolean isShutdown() {
216         return shutdown.get();
217     }
218 
219     @Override
220     public final boolean isTerminated()  {
221         return pool.executor().isTerminated();
222     }
223 
224     /**
225      * Returns true if there are no channels in the group
226      */
227     abstract boolean isEmpty();
228 
229     /**
230      * Attaches a foreign channel to this group.
231      */
232     abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)
233         throws IOException;
234 
235     /**
236      * Detaches a foreign channel from this group.
237      */
238     abstract void detachForeignChannel(Object key);
239 
240     /**
241      * Closes all channels in the group
242      */
243     abstract void closeAllChannels() throws IOException;
244 
245     /**
246      * Shutdown all tasks waiting for I/O events.
247      */
248     abstract void shutdownHandlerTasks();
249 
250     @SuppressWarnings("removal")
251     private void shutdownExecutors() {
252         AccessController.doPrivileged(
253             new PrivilegedAction<>() {
254                 public Void run() {
255                     pool.executor().shutdown();
256                     timeoutExecutor.shutdown();
257                     return null;
258                 }
259             },
260             null,
261             new RuntimePermission("modifyThread"));
262     }
263 
264     @Override
265     public final void shutdown() {
266         if (shutdown.getAndSet(true)) {
267             // already shutdown
268             return;
269         }
270         // if there are channels in the group then shutdown will continue
271         // when the last channel is closed
272         if (!isEmpty()) {
273             return;
274         }
275         // initiate termination (acquire shutdownNowLock to ensure that other
276         // threads invoking shutdownNow will block).
277         synchronized (shutdownNowLock) {
278             if (!terminateInitiated) {
279                 terminateInitiated = true;
280                 shutdownHandlerTasks();
281                 shutdownExecutors();
282             }
283         }
284     }
285 
286     @Override
287     public final void shutdownNow() throws IOException {
288         shutdown.set(true);
289         synchronized (shutdownNowLock) {
290             if (!terminateInitiated) {
291                 terminateInitiated = true;
292                 closeAllChannels();
293                 shutdownHandlerTasks();
294                 shutdownExecutors();
295             }
296         }
297     }
298 
299     /**
300      * For use by AsynchronousFileChannel to release resources without shutting
301      * down the thread pool.
302      */
303     final void detachFromThreadPool() {
304         if (shutdown.getAndSet(true))
305             throw new AssertionError("Already shutdown");
306         if (!isEmpty())
307             throw new AssertionError("Group not empty");
308         shutdownHandlerTasks();
309     }
310 
311     @Override
312     public final boolean awaitTermination(long timeout, TimeUnit unit)
313         throws InterruptedException
314     {
315         return pool.executor().awaitTermination(timeout, unit);
316     }
317 
318     /**
319      * Executes the given command on one of the channel group's pooled threads.
320      */
321     @Override
322     public final void execute(Runnable task) {
323         @SuppressWarnings("removal")
324         SecurityManager sm = System.getSecurityManager();
325         if (sm != null) {
326             // when a security manager is installed then the user's task
327             // must be run with the current calling context
328             @SuppressWarnings("removal")
329             final AccessControlContext acc = AccessController.getContext();
330             final Runnable delegate = task;
331             task = new Runnable() {
332                 @SuppressWarnings("removal")
333                 @Override
334                 public void run() {
335                     AccessController.doPrivileged(new PrivilegedAction<>() {
336                         @Override
337                         public Void run() {
338                             delegate.run();
339                             return null;
340                         }
341                     }, acc);
342                 }
343             };
344         }
345         executeOnPooledThread(task);
346     }
347 }
348