1 /*
2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package sun.nio.ch;
27 
28 import java.nio.channels.*;
29 import java.nio.channels.spi.AsynchronousChannelProvider;
30 import java.io.Closeable;
31 import java.io.IOException;
32 import java.io.FileDescriptor;
33 import java.util.*;
34 import java.util.concurrent.*;
35 import java.util.concurrent.locks.ReadWriteLock;
36 import java.util.concurrent.locks.ReentrantReadWriteLock;
37 import jdk.internal.misc.Unsafe;
38 
39 /**
40  * Windows implementation of AsynchronousChannelGroup encapsulating an I/O
41  * completion port.
42  */
43 
44 class Iocp extends AsynchronousChannelGroupImpl {
45     private static final Unsafe unsafe = Unsafe.getUnsafe();
46     private static final long INVALID_HANDLE_VALUE  = -1L;
47 
48     // maps completion key to channel
49     private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();
50     private final Map<Integer,OverlappedChannel> keyToChannel =
51         new HashMap<Integer,OverlappedChannel>();
52     private int nextCompletionKey;
53 
54     // handle to completion port
55     private final long port;
56 
57     // true if port has been closed
58     private boolean closed;
59 
60     // the set of "stale" OVERLAPPED structures. These OVERLAPPED structures
61     // relate to I/O operations where the completion notification was not
62     // received in a timely manner after the channel is closed.
63     private final Set<Long> staleIoSet = new HashSet<Long>();
64 
Iocp(AsynchronousChannelProvider provider, ThreadPool pool)65     Iocp(AsynchronousChannelProvider provider, ThreadPool pool)
66         throws IOException
67     {
68         super(provider, pool);
69         this.port =
70           createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount());
71         this.nextCompletionKey = 1;
72     }
73 
start()74     Iocp start() {
75         startThreads(new EventHandlerTask());
76         return this;
77     }
78 
79     /*
80      * Channels implements this interface support overlapped I/O and can be
81      * associated with a completion port.
82      */
83     static interface OverlappedChannel extends Closeable {
84         /**
85          * Returns a reference to the pending I/O result.
86          */
getByOverlapped(long overlapped)87         <V,A> PendingFuture<V,A> getByOverlapped(long overlapped);
88     }
89 
90     // release all resources
implClose()91     void implClose() {
92         synchronized (this) {
93             if (closed)
94                 return;
95             closed = true;
96         }
97         close0(port);
98         synchronized (staleIoSet) {
99             for (Long ov: staleIoSet) {
100                 unsafe.freeMemory(ov);
101             }
102             staleIoSet.clear();
103         }
104     }
105 
106     @Override
isEmpty()107     boolean isEmpty() {
108         keyToChannelLock.writeLock().lock();
109         try {
110             return keyToChannel.isEmpty();
111         } finally {
112             keyToChannelLock.writeLock().unlock();
113         }
114     }
115 
116     @Override
attachForeignChannel(final Channel channel, FileDescriptor fdObj)117     final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj)
118         throws IOException
119     {
120         int key = associate(new OverlappedChannel() {
121             public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
122                 return null;
123             }
124             public void close() throws IOException {
125                 channel.close();
126             }
127         }, 0L);
128         return Integer.valueOf(key);
129     }
130 
131     @Override
detachForeignChannel(Object key)132     final void detachForeignChannel(Object key) {
133         disassociate((Integer)key);
134     }
135 
136     @Override
closeAllChannels()137     void closeAllChannels() {
138         /**
139          * On Windows the close operation will close the socket/file handle
140          * and then wait until all outstanding I/O operations have aborted.
141          * This is necessary as each channel's cache of OVERLAPPED structures
142          * can only be freed once all I/O operations have completed. As I/O
143          * completion requires a lookup of the keyToChannel then we must close
144          * the channels when not holding the write lock.
145          */
146         final int MAX_BATCH_SIZE = 32;
147         OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE];
148         int count;
149         do {
150             // grab a batch of up to 32 channels
151             keyToChannelLock.writeLock().lock();
152             count = 0;
153             try {
154                 for (Integer key: keyToChannel.keySet()) {
155                     channels[count++] = keyToChannel.get(key);
156                     if (count >= MAX_BATCH_SIZE)
157                         break;
158                 }
159             } finally {
160                 keyToChannelLock.writeLock().unlock();
161             }
162 
163             // close them
164             for (int i=0; i<count; i++) {
165                 try {
166                     channels[i].close();
167                 } catch (IOException ignore) { }
168             }
169         } while (count > 0);
170     }
171 
wakeup()172     private void wakeup() {
173         try {
174             postQueuedCompletionStatus(port, 0);
175         } catch (IOException e) {
176             // should not happen
177             throw new AssertionError(e);
178         }
179     }
180 
181     @Override
executeOnHandlerTask(Runnable task)182     void executeOnHandlerTask(Runnable task) {
183         synchronized (this) {
184             if (closed)
185                 throw new RejectedExecutionException();
186             offerTask(task);
187             wakeup();
188         }
189 
190     }
191 
192     @Override
shutdownHandlerTasks()193     void shutdownHandlerTasks() {
194         // shutdown all handler threads
195         int nThreads = threadCount();
196         while (nThreads-- > 0) {
197             wakeup();
198         }
199     }
200 
201     /**
202      * Associate the given handle with this group
203      */
associate(OverlappedChannel ch, long handle)204     int associate(OverlappedChannel ch, long handle) throws IOException {
205         keyToChannelLock.writeLock().lock();
206 
207         // generate a completion key (if not shutdown)
208         int key;
209         try {
210             if (isShutdown())
211                 throw new ShutdownChannelGroupException();
212 
213             // generate unique key
214             do {
215                 key = nextCompletionKey++;
216             } while ((key == 0) || keyToChannel.containsKey(key));
217 
218             // associate with I/O completion port
219             if (handle != 0L) {
220                 createIoCompletionPort(handle, port, key, 0);
221             }
222 
223             // setup mapping
224             keyToChannel.put(key, ch);
225         } finally {
226             keyToChannelLock.writeLock().unlock();
227         }
228         return key;
229     }
230 
231     /**
232      * Disassociate channel from the group.
233      */
disassociate(int key)234     void disassociate(int key) {
235         boolean checkForShutdown = false;
236 
237         keyToChannelLock.writeLock().lock();
238         try {
239             keyToChannel.remove(key);
240 
241             // last key to be removed so check if group is shutdown
242             if (keyToChannel.isEmpty())
243                 checkForShutdown = true;
244 
245         } finally {
246             keyToChannelLock.writeLock().unlock();
247         }
248 
249         // continue shutdown
250         if (checkForShutdown && isShutdown()) {
251             try {
252                 shutdownNow();
253             } catch (IOException ignore) { }
254         }
255     }
256 
257     /**
258      * Invoked when a channel associated with this port is closed before
259      * notifications for all outstanding I/O operations have been received.
260      */
makeStale(Long overlapped)261     void makeStale(Long overlapped) {
262         synchronized (staleIoSet) {
263             staleIoSet.add(overlapped);
264         }
265     }
266 
267     /**
268      * Checks if the given OVERLAPPED is stale and if so, releases it.
269      */
checkIfStale(long ov)270     private void checkIfStale(long ov) {
271         synchronized (staleIoSet) {
272             boolean removed = staleIoSet.remove(ov);
273             if (removed) {
274                 unsafe.freeMemory(ov);
275             }
276         }
277     }
278 
279     /**
280      * The handler for consuming the result of an asynchronous I/O operation.
281      */
282     static interface ResultHandler {
283         /**
284          * Invoked if the I/O operation completes successfully.
285          */
completed(int bytesTransferred, boolean canInvokeDirect)286         public void completed(int bytesTransferred, boolean canInvokeDirect);
287 
288         /**
289          * Invoked if the I/O operation fails.
290          */
failed(int error, IOException ioe)291         public void failed(int error, IOException ioe);
292     }
293 
294     // Creates IOException for the given I/O error.
translateErrorToIOException(int error)295     private static IOException translateErrorToIOException(int error) {
296         String msg = getErrorMessage(error);
297         if (msg == null)
298             msg = "Unknown error: 0x0" + Integer.toHexString(error);
299         return new IOException(msg);
300     }
301 
302     /**
303      * Long-running task servicing system-wide or per-file completion port
304      */
305     private class EventHandlerTask implements Runnable {
run()306         public void run() {
307             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
308                 Invoker.getGroupAndInvokeCount();
309             boolean canInvokeDirect = (myGroupAndInvokeCount != null);
310             CompletionStatus ioResult = new CompletionStatus();
311             boolean replaceMe = false;
312 
313             try {
314                 for (;;) {
315                     // reset invoke count
316                     if (myGroupAndInvokeCount != null)
317                         myGroupAndInvokeCount.resetInvokeCount();
318 
319                     // wait for I/O completion event
320                     // A error here is fatal (thread will not be replaced)
321                     replaceMe = false;
322                     try {
323                         getQueuedCompletionStatus(port, ioResult);
324                     } catch (IOException x) {
325                         // should not happen
326                         x.printStackTrace();
327                         return;
328                     }
329 
330                     // handle wakeup to execute task or shutdown
331                     if (ioResult.completionKey() == 0 &&
332                         ioResult.overlapped() == 0L)
333                     {
334                         Runnable task = pollTask();
335                         if (task == null) {
336                             // shutdown request
337                             return;
338                         }
339 
340                         // run task
341                         // (if error/exception then replace thread)
342                         replaceMe = true;
343                         task.run();
344                         continue;
345                     }
346 
347                     // map key to channel
348                     OverlappedChannel ch = null;
349                     keyToChannelLock.readLock().lock();
350                     try {
351                         ch = keyToChannel.get(ioResult.completionKey());
352                         if (ch == null) {
353                             checkIfStale(ioResult.overlapped());
354                             continue;
355                         }
356                     } finally {
357                         keyToChannelLock.readLock().unlock();
358                     }
359 
360                     // lookup I/O request
361                     PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped());
362                     if (result == null) {
363                         // we get here if the OVERLAPPED structure is associated
364                         // with an I/O operation on a channel that was closed
365                         // but the I/O operation event wasn't read in a timely
366                         // manner. Alternatively, it may be related to a
367                         // tryLock operation as the OVERLAPPED structures for
368                         // these operations are not in the I/O cache.
369                         checkIfStale(ioResult.overlapped());
370                         continue;
371                     }
372 
373                     // synchronize on result in case I/O completed immediately
374                     // and was handled by initiator
375                     synchronized (result) {
376                         if (result.isDone()) {
377                             continue;
378                         }
379                         // not handled by initiator
380                     }
381 
382                     // invoke I/O result handler
383                     int error = ioResult.error();
384                     ResultHandler rh = (ResultHandler)result.getContext();
385                     replaceMe = true; // (if error/exception then replace thread)
386                     if (error == 0) {
387                         rh.completed(ioResult.bytesTransferred(), canInvokeDirect);
388                     } else {
389                         rh.failed(error, translateErrorToIOException(error));
390                     }
391                 }
392             } finally {
393                 // last thread to exit when shutdown releases resources
394                 int remaining = threadExit(this, replaceMe);
395                 if (remaining == 0 && isShutdown()) {
396                     implClose();
397                 }
398             }
399         }
400     }
401 
402     /**
403      * Container for data returned by GetQueuedCompletionStatus
404      */
405     private static class CompletionStatus {
406         private int error;
407         private int bytesTransferred;
408         private int completionKey;
409         private long overlapped;
410 
CompletionStatus()411         private CompletionStatus() { }
error()412         int error() { return error; }
bytesTransferred()413         int bytesTransferred() { return bytesTransferred; }
completionKey()414         int completionKey() { return completionKey; }
overlapped()415         long overlapped() { return overlapped; }
416     }
417 
418     // -- native methods --
419 
initIDs()420     private static native void initIDs();
421 
createIoCompletionPort(long handle, long existingPort, int completionKey, int concurrency)422     private static native long createIoCompletionPort(long handle,
423         long existingPort, int completionKey, int concurrency) throws IOException;
424 
close0(long handle)425     private static native void close0(long handle);
426 
getQueuedCompletionStatus(long completionPort, CompletionStatus status)427     private static native void getQueuedCompletionStatus(long completionPort,
428         CompletionStatus status) throws IOException;
429 
postQueuedCompletionStatus(long completionPort, int completionKey)430     private static native void postQueuedCompletionStatus(long completionPort,
431         int completionKey) throws IOException;
432 
getErrorMessage(int error)433     private static native String getErrorMessage(int error);
434 
435     static {
IOUtil.load()436         IOUtil.load();
initIDs()437         initIDs();
438     }
439 }
440