1 /*
2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package sun.nio.ch;
27 
28 import java.nio.channels.*;
29 import java.nio.ByteBuffer;
30 import java.nio.BufferOverflowException;
31 import java.net.*;
32 import java.util.concurrent.*;
33 import java.io.IOException;
34 import java.security.AccessController;
35 import java.security.PrivilegedActionException;
36 import java.security.PrivilegedExceptionAction;
37 import jdk.internal.misc.Unsafe;
38 import sun.net.util.SocketExceptions;
39 
40 /**
41  * Windows implementation of AsynchronousSocketChannel using overlapped I/O.
42  */
43 
44 class WindowsAsynchronousSocketChannelImpl
45     extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
46 {
47     private static final Unsafe unsafe = Unsafe.getUnsafe();
48     private static int addressSize = unsafe.addressSize();
49 
dependsArch(int value32, int value64)50     private static int dependsArch(int value32, int value64) {
51         return (addressSize == 4) ? value32 : value64;
52     }
53 
54     /*
55      * typedef struct _WSABUF {
56      *     u_long      len;
57      *     char FAR *  buf;
58      * } WSABUF;
59      */
60     private static final int SIZEOF_WSABUF  = dependsArch(8, 16);
61     private static final int OFFSETOF_LEN   = 0;
62     private static final int OFFSETOF_BUF   = dependsArch(4, 8);
63 
64     // maximum vector size for scatter/gather I/O
65     private static final int MAX_WSABUF     = 16;
66 
67     private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
68 
69 
70     // socket handle. Use begin()/end() around each usage of this handle.
71     final long handle;
72 
73     // I/O completion port that the socket is associated with
74     private final Iocp iocp;
75 
76     // completion key to identify channel when I/O completes
77     private final int completionKey;
78 
79     // Pending I/O operations are tied to an OVERLAPPED structure that can only
80     // be released when the I/O completion event is posted to the completion
81     // port. Where I/O operations complete immediately then it is possible
82     // there may be more than two OVERLAPPED structures in use.
83     private final PendingIoCache ioCache;
84 
85     // per-channel arrays of WSABUF structures
86     private final long readBufferArray;
87     private final long writeBufferArray;
88 
89 
WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)90     WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
91         throws IOException
92     {
93         super(iocp);
94 
95         // associate socket with default completion port
96         long h = IOUtil.fdVal(fd);
97         int key = 0;
98         try {
99             key = iocp.associate(this, h);
100         } catch (ShutdownChannelGroupException x) {
101             if (failIfGroupShutdown) {
102                 closesocket0(h);
103                 throw x;
104             }
105         } catch (IOException x) {
106             closesocket0(h);
107             throw x;
108         }
109 
110         this.handle = h;
111         this.iocp = iocp;
112         this.completionKey = key;
113         this.ioCache = new PendingIoCache();
114 
115         // allocate WSABUF arrays
116         this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
117         this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
118     }
119 
WindowsAsynchronousSocketChannelImpl(Iocp iocp)120     WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
121         this(iocp, true);
122     }
123 
124     @Override
group()125     public AsynchronousChannelGroupImpl group() {
126         return iocp;
127     }
128 
129     /**
130      * Invoked by Iocp when an I/O operation competes.
131      */
132     @Override
getByOverlapped(long overlapped)133     public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
134         return ioCache.remove(overlapped);
135     }
136 
137     // invoked by WindowsAsynchronousServerSocketChannelImpl
handle()138     long handle() {
139         return handle;
140     }
141 
142     // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
143     // accept
setConnected(InetSocketAddress localAddress, InetSocketAddress remoteAddress)144     void setConnected(InetSocketAddress localAddress,
145                       InetSocketAddress remoteAddress)
146     {
147         synchronized (stateLock) {
148             state = ST_CONNECTED;
149             this.localAddress = localAddress;
150             this.remoteAddress = remoteAddress;
151         }
152     }
153 
154     @Override
implClose()155     void implClose() throws IOException {
156         // close socket (may cause outstanding async I/O operations to fail).
157         closesocket0(handle);
158 
159         // waits until all I/O operations have completed
160         ioCache.close();
161 
162         // release arrays of WSABUF structures
163         unsafe.freeMemory(readBufferArray);
164         unsafe.freeMemory(writeBufferArray);
165 
166         // finally disassociate from the completion port (key can be 0 if
167         // channel created when group is shutdown)
168         if (completionKey != 0)
169             iocp.disassociate(completionKey);
170     }
171 
172     @Override
onCancel(PendingFuture<?,?> task)173     public void onCancel(PendingFuture<?,?> task) {
174         if (task.getContext() instanceof ConnectTask)
175             killConnect();
176         if (task.getContext() instanceof ReadTask)
177             killReading();
178         if (task.getContext() instanceof WriteTask)
179             killWriting();
180     }
181 
182     /**
183      * Implements the task to initiate a connection and the handler to
184      * consume the result when the connection is established (or fails).
185      */
186     private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
187         private final InetSocketAddress remote;
188         private final PendingFuture<Void,A> result;
189 
ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result)190         ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
191             this.remote = remote;
192             this.result = result;
193         }
194 
closeChannel()195         private void closeChannel() {
196             try {
197                 close();
198             } catch (IOException ignore) { }
199         }
200 
toIOException(Throwable x)201         private IOException toIOException(Throwable x) {
202             if (x instanceof IOException) {
203                 if (x instanceof ClosedChannelException)
204                     x = new AsynchronousCloseException();
205                 return (IOException)x;
206             }
207             return new IOException(x);
208         }
209 
210         /**
211          * Invoke after a connection is successfully established.
212          */
afterConnect()213         private void afterConnect() throws IOException {
214             updateConnectContext(handle);
215             synchronized (stateLock) {
216                 state = ST_CONNECTED;
217                 remoteAddress = remote;
218             }
219         }
220 
221         /**
222          * Task to initiate a connection.
223          */
224         @Override
run()225         public void run() {
226             long overlapped = 0L;
227             Throwable exc = null;
228             try {
229                 begin();
230 
231                 // synchronize on result to allow this thread handle the case
232                 // where the connection is established immediately.
233                 synchronized (result) {
234                     overlapped = ioCache.add(result);
235                     // initiate the connection
236                     int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
237                                      remote.getPort(), overlapped);
238                     if (n == IOStatus.UNAVAILABLE) {
239                         // connection is pending
240                         return;
241                     }
242 
243                     // connection established immediately
244                     afterConnect();
245                     result.setResult(null);
246                 }
247             } catch (Throwable x) {
248                 if (overlapped != 0L)
249                     ioCache.remove(overlapped);
250                 exc = x;
251             } finally {
252                 end();
253             }
254 
255             if (exc != null) {
256                 closeChannel();
257                 exc = SocketExceptions.of(toIOException(exc), remote);
258                 result.setFailure(exc);
259             }
260             Invoker.invoke(result);
261         }
262 
263         /**
264          * Invoked by handler thread when connection established.
265          */
266         @Override
completed(int bytesTransferred, boolean canInvokeDirect)267         public void completed(int bytesTransferred, boolean canInvokeDirect) {
268             Throwable exc = null;
269             try {
270                 begin();
271                 afterConnect();
272                 result.setResult(null);
273             } catch (Throwable x) {
274                 // channel is closed or unable to finish connect
275                 exc = x;
276             } finally {
277                 end();
278             }
279 
280             // can't close channel while in begin/end block
281             if (exc != null) {
282                 closeChannel();
283                 IOException ee = toIOException(exc);
284                 ee = SocketExceptions.of(ee, remote);
285                 result.setFailure(ee);
286             }
287 
288             if (canInvokeDirect) {
289                 Invoker.invokeUnchecked(result);
290             } else {
291                 Invoker.invoke(result);
292             }
293         }
294 
295         /**
296          * Invoked by handler thread when failed to establish connection.
297          */
298         @Override
failed(int error, IOException x)299         public void failed(int error, IOException x) {
300             x = SocketExceptions.of(x, remote);
301             if (isOpen()) {
302                 closeChannel();
303                 result.setFailure(x);
304             } else {
305                 x = SocketExceptions.of(new AsynchronousCloseException(), remote);
306                 result.setFailure(x);
307             }
308             Invoker.invoke(result);
309         }
310     }
311 
doPrivilegedBind(final SocketAddress sa)312     private void doPrivilegedBind(final SocketAddress sa) throws IOException {
313         try {
314             AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
315                 public Void run() throws IOException {
316                     bind(sa);
317                     return null;
318                 }
319             });
320         } catch (PrivilegedActionException e) {
321             throw (IOException) e.getException();
322         }
323     }
324 
325     @Override
implConnect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler)326     <A> Future<Void> implConnect(SocketAddress remote,
327                                  A attachment,
328                                  CompletionHandler<Void,? super A> handler)
329     {
330         if (!isOpen()) {
331             Throwable exc = new ClosedChannelException();
332             if (handler == null)
333                 return CompletedFuture.withFailure(exc);
334             Invoker.invoke(this, handler, attachment, null, exc);
335             return null;
336         }
337 
338         InetSocketAddress isa = Net.checkAddress(remote);
339 
340         // permission check
341         SecurityManager sm = System.getSecurityManager();
342         if (sm != null)
343             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
344 
345         // check and update state
346         // ConnectEx requires the socket to be bound to a local address
347         IOException bindException = null;
348         synchronized (stateLock) {
349             if (state == ST_CONNECTED)
350                 throw new AlreadyConnectedException();
351             if (state == ST_PENDING)
352                 throw new ConnectionPendingException();
353             if (localAddress == null) {
354                 try {
355                     SocketAddress any = new InetSocketAddress(0);
356                     if (sm == null) {
357                         bind(any);
358                     } else {
359                         doPrivilegedBind(any);
360                     }
361                 } catch (IOException x) {
362                     bindException = x;
363                 }
364             }
365             if (bindException == null)
366                 state = ST_PENDING;
367         }
368 
369         // handle bind failure
370         if (bindException != null) {
371             try {
372                 close();
373             } catch (IOException ignore) { }
374             if (handler == null)
375                 return CompletedFuture.withFailure(bindException);
376             Invoker.invoke(this, handler, attachment, null, bindException);
377             return null;
378         }
379 
380         // setup task
381         PendingFuture<Void,A> result =
382             new PendingFuture<Void,A>(this, handler, attachment);
383         ConnectTask<A> task = new ConnectTask<A>(isa, result);
384         result.setContext(task);
385 
386         // initiate I/O
387         task.run();
388         return result;
389     }
390 
391     /**
392      * Implements the task to initiate a read and the handler to consume the
393      * result when the read completes.
394      */
395     private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
396         private final ByteBuffer[] bufs;
397         private final int numBufs;
398         private final boolean scatteringRead;
399         private final PendingFuture<V,A> result;
400 
401         // set by run method
402         private ByteBuffer[] shadow;
403 
ReadTask(ByteBuffer[] bufs, boolean scatteringRead, PendingFuture<V,A> result)404         ReadTask(ByteBuffer[] bufs,
405                  boolean scatteringRead,
406                  PendingFuture<V,A> result)
407         {
408             this.bufs = bufs;
409             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
410             this.scatteringRead = scatteringRead;
411             this.result = result;
412         }
413 
414         /**
415          * Invoked prior to read to prepare the WSABUF array. Where necessary,
416          * it substitutes non-direct buffers with direct buffers.
417          */
prepareBuffers()418         void prepareBuffers() {
419             shadow = new ByteBuffer[numBufs];
420             long address = readBufferArray;
421             for (int i=0; i<numBufs; i++) {
422                 ByteBuffer dst = bufs[i];
423                 int pos = dst.position();
424                 int lim = dst.limit();
425                 assert (pos <= lim);
426                 int rem = (pos <= lim ? lim - pos : 0);
427                 long a;
428                 if (!(dst instanceof DirectBuffer)) {
429                     // substitute with direct buffer
430                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
431                     shadow[i] = bb;
432                     a = ((DirectBuffer)bb).address();
433                 } else {
434                     shadow[i] = dst;
435                     a = ((DirectBuffer)dst).address() + pos;
436                 }
437                 unsafe.putAddress(address + OFFSETOF_BUF, a);
438                 unsafe.putInt(address + OFFSETOF_LEN, rem);
439                 address += SIZEOF_WSABUF;
440             }
441         }
442 
443         /**
444          * Invoked after a read has completed to update the buffer positions
445          * and release any substituted buffers.
446          */
updateBuffers(int bytesRead)447         void updateBuffers(int bytesRead) {
448             for (int i=0; i<numBufs; i++) {
449                 ByteBuffer nextBuffer = shadow[i];
450                 int pos = nextBuffer.position();
451                 int len = nextBuffer.remaining();
452                 if (bytesRead >= len) {
453                     bytesRead -= len;
454                     int newPosition = pos + len;
455                     try {
456                         nextBuffer.position(newPosition);
457                     } catch (IllegalArgumentException x) {
458                         // position changed by another
459                     }
460                 } else { // Buffers not completely filled
461                     if (bytesRead > 0) {
462                         assert(pos + bytesRead < (long)Integer.MAX_VALUE);
463                         int newPosition = pos + bytesRead;
464                         try {
465                             nextBuffer.position(newPosition);
466                         } catch (IllegalArgumentException x) {
467                             // position changed by another
468                         }
469                     }
470                     break;
471                 }
472             }
473 
474             // Put results from shadow into the slow buffers
475             for (int i=0; i<numBufs; i++) {
476                 if (!(bufs[i] instanceof DirectBuffer)) {
477                     shadow[i].flip();
478                     try {
479                         bufs[i].put(shadow[i]);
480                     } catch (BufferOverflowException x) {
481                         // position changed by another
482                     }
483                 }
484             }
485         }
486 
releaseBuffers()487         void releaseBuffers() {
488             for (int i=0; i<numBufs; i++) {
489                 if (!(bufs[i] instanceof DirectBuffer)) {
490                     Util.releaseTemporaryDirectBuffer(shadow[i]);
491                 }
492             }
493         }
494 
495         @Override
496         @SuppressWarnings("unchecked")
run()497         public void run() {
498             long overlapped = 0L;
499             boolean prepared = false;
500             boolean pending = false;
501 
502             try {
503                 begin();
504 
505                 // substitute non-direct buffers
506                 prepareBuffers();
507                 prepared = true;
508 
509                 // get an OVERLAPPED structure (from the cache or allocate)
510                 overlapped = ioCache.add(result);
511 
512                 // initiate read
513                 int n = read0(handle, numBufs, readBufferArray, overlapped);
514                 if (n == IOStatus.UNAVAILABLE) {
515                     // I/O is pending
516                     pending = true;
517                     return;
518                 }
519                 if (n == IOStatus.EOF) {
520                     // input shutdown
521                     enableReading();
522                     if (scatteringRead) {
523                         result.setResult((V)Long.valueOf(-1L));
524                     } else {
525                         result.setResult((V)Integer.valueOf(-1));
526                     }
527                 } else {
528                     throw new InternalError("Read completed immediately");
529                 }
530             } catch (Throwable x) {
531                 // failed to initiate read
532                 // reset read flag before releasing waiters
533                 enableReading();
534                 if (x instanceof ClosedChannelException)
535                     x = new AsynchronousCloseException();
536                 if (!(x instanceof IOException))
537                     x = new IOException(x);
538                 result.setFailure(x);
539             } finally {
540                 // release resources if I/O not pending
541                 if (!pending) {
542                     if (overlapped != 0L)
543                         ioCache.remove(overlapped);
544                     if (prepared)
545                         releaseBuffers();
546                 }
547                 end();
548             }
549 
550             // invoke completion handler
551             Invoker.invoke(result);
552         }
553 
554         /**
555          * Executed when the I/O has completed
556          */
557         @Override
558         @SuppressWarnings("unchecked")
completed(int bytesTransferred, boolean canInvokeDirect)559         public void completed(int bytesTransferred, boolean canInvokeDirect) {
560             if (bytesTransferred == 0) {
561                 bytesTransferred = -1;  // EOF
562             } else {
563                 updateBuffers(bytesTransferred);
564             }
565 
566             // return direct buffer to cache if substituted
567             releaseBuffers();
568 
569             // release waiters if not already released by timeout
570             synchronized (result) {
571                 if (result.isDone())
572                     return;
573                 enableReading();
574                 if (scatteringRead) {
575                     result.setResult((V)Long.valueOf(bytesTransferred));
576                 } else {
577                     result.setResult((V)Integer.valueOf(bytesTransferred));
578                 }
579             }
580             if (canInvokeDirect) {
581                 Invoker.invokeUnchecked(result);
582             } else {
583                 Invoker.invoke(result);
584             }
585         }
586 
587         @Override
failed(int error, IOException x)588         public void failed(int error, IOException x) {
589             // return direct buffer to cache if substituted
590             releaseBuffers();
591 
592             // release waiters if not already released by timeout
593             if (!isOpen())
594                 x = new AsynchronousCloseException();
595 
596             synchronized (result) {
597                 if (result.isDone())
598                     return;
599                 enableReading();
600                 result.setFailure(x);
601             }
602             Invoker.invoke(result);
603         }
604 
605         /**
606          * Invoked if timeout expires before it is cancelled
607          */
timeout()608         void timeout() {
609             // synchronize on result as the I/O could complete/fail
610             synchronized (result) {
611                 if (result.isDone())
612                     return;
613 
614                 // kill further reading before releasing waiters
615                 enableReading(true);
616                 result.setFailure(new InterruptedByTimeoutException());
617             }
618 
619             // invoke handler without any locks
620             Invoker.invoke(result);
621         }
622     }
623 
624     @Override
implRead(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)625     <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
626                                             ByteBuffer dst,
627                                             ByteBuffer[] dsts,
628                                             long timeout,
629                                             TimeUnit unit,
630                                             A attachment,
631                                             CompletionHandler<V,? super A> handler)
632     {
633         // setup task
634         PendingFuture<V,A> result =
635             new PendingFuture<V,A>(this, handler, attachment);
636         ByteBuffer[] bufs;
637         if (isScatteringRead) {
638             bufs = dsts;
639         } else {
640             bufs = new ByteBuffer[1];
641             bufs[0] = dst;
642         }
643         final ReadTask<V,A> readTask =
644                 new ReadTask<V,A>(bufs, isScatteringRead, result);
645         result.setContext(readTask);
646 
647         // schedule timeout
648         if (timeout > 0L) {
649             Future<?> timeoutTask = iocp.schedule(new Runnable() {
650                 public void run() {
651                     readTask.timeout();
652                 }
653             }, timeout, unit);
654             result.setTimeoutTask(timeoutTask);
655         }
656 
657         // initiate I/O
658         readTask.run();
659         return result;
660     }
661 
662     /**
663      * Implements the task to initiate a write and the handler to consume the
664      * result when the write completes.
665      */
666     private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
667         private final ByteBuffer[] bufs;
668         private final int numBufs;
669         private final boolean gatheringWrite;
670         private final PendingFuture<V,A> result;
671 
672         // set by run method
673         private ByteBuffer[] shadow;
674 
WriteTask(ByteBuffer[] bufs, boolean gatheringWrite, PendingFuture<V,A> result)675         WriteTask(ByteBuffer[] bufs,
676                   boolean gatheringWrite,
677                   PendingFuture<V,A> result)
678         {
679             this.bufs = bufs;
680             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
681             this.gatheringWrite = gatheringWrite;
682             this.result = result;
683         }
684 
685         /**
686          * Invoked prior to write to prepare the WSABUF array. Where necessary,
687          * it substitutes non-direct buffers with direct buffers.
688          */
prepareBuffers()689         void prepareBuffers() {
690             shadow = new ByteBuffer[numBufs];
691             long address = writeBufferArray;
692             for (int i=0; i<numBufs; i++) {
693                 ByteBuffer src = bufs[i];
694                 int pos = src.position();
695                 int lim = src.limit();
696                 assert (pos <= lim);
697                 int rem = (pos <= lim ? lim - pos : 0);
698                 long a;
699                 if (!(src instanceof DirectBuffer)) {
700                     // substitute with direct buffer
701                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
702                     bb.put(src);
703                     bb.flip();
704                     src.position(pos);  // leave heap buffer untouched for now
705                     shadow[i] = bb;
706                     a = ((DirectBuffer)bb).address();
707                 } else {
708                     shadow[i] = src;
709                     a = ((DirectBuffer)src).address() + pos;
710                 }
711                 unsafe.putAddress(address + OFFSETOF_BUF, a);
712                 unsafe.putInt(address + OFFSETOF_LEN, rem);
713                 address += SIZEOF_WSABUF;
714             }
715         }
716 
717         /**
718          * Invoked after a write has completed to update the buffer positions
719          * and release any substituted buffers.
720          */
updateBuffers(int bytesWritten)721         void updateBuffers(int bytesWritten) {
722             // Notify the buffers how many bytes were taken
723             for (int i=0; i<numBufs; i++) {
724                 ByteBuffer nextBuffer = bufs[i];
725                 int pos = nextBuffer.position();
726                 int lim = nextBuffer.limit();
727                 int len = (pos <= lim ? lim - pos : lim);
728                 if (bytesWritten >= len) {
729                     bytesWritten -= len;
730                     int newPosition = pos + len;
731                     try {
732                         nextBuffer.position(newPosition);
733                     } catch (IllegalArgumentException x) {
734                         // position changed by someone else
735                     }
736                 } else { // Buffers not completely filled
737                     if (bytesWritten > 0) {
738                         assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
739                         int newPosition = pos + bytesWritten;
740                         try {
741                             nextBuffer.position(newPosition);
742                         } catch (IllegalArgumentException x) {
743                             // position changed by someone else
744                         }
745                     }
746                     break;
747                 }
748             }
749         }
750 
releaseBuffers()751         void releaseBuffers() {
752             for (int i=0; i<numBufs; i++) {
753                 if (!(bufs[i] instanceof DirectBuffer)) {
754                     Util.releaseTemporaryDirectBuffer(shadow[i]);
755                 }
756             }
757         }
758 
759         @Override
760         //@SuppressWarnings("unchecked")
run()761         public void run() {
762             long overlapped = 0L;
763             boolean prepared = false;
764             boolean pending = false;
765             boolean shutdown = false;
766 
767             try {
768                 begin();
769 
770                 // substitute non-direct buffers
771                 prepareBuffers();
772                 prepared = true;
773 
774                 // get an OVERLAPPED structure (from the cache or allocate)
775                 overlapped = ioCache.add(result);
776                 int n = write0(handle, numBufs, writeBufferArray, overlapped);
777                 if (n == IOStatus.UNAVAILABLE) {
778                     // I/O is pending
779                     pending = true;
780                     return;
781                 }
782                 if (n == IOStatus.EOF) {
783                     // special case for shutdown output
784                     shutdown = true;
785                     throw new ClosedChannelException();
786                 }
787                 // write completed immediately
788                 throw new InternalError("Write completed immediately");
789             } catch (Throwable x) {
790                 // write failed. Enable writing before releasing waiters.
791                 enableWriting();
792                 if (!shutdown && (x instanceof ClosedChannelException))
793                     x = new AsynchronousCloseException();
794                 if (!(x instanceof IOException))
795                     x = new IOException(x);
796                 result.setFailure(x);
797             } finally {
798                 // release resources if I/O not pending
799                 if (!pending) {
800                     if (overlapped != 0L)
801                         ioCache.remove(overlapped);
802                     if (prepared)
803                         releaseBuffers();
804                 }
805                 end();
806             }
807 
808             // invoke completion handler
809             Invoker.invoke(result);
810         }
811 
812         /**
813          * Executed when the I/O has completed
814          */
815         @Override
816         @SuppressWarnings("unchecked")
completed(int bytesTransferred, boolean canInvokeDirect)817         public void completed(int bytesTransferred, boolean canInvokeDirect) {
818             updateBuffers(bytesTransferred);
819 
820             // return direct buffer to cache if substituted
821             releaseBuffers();
822 
823             // release waiters if not already released by timeout
824             synchronized (result) {
825                 if (result.isDone())
826                     return;
827                 enableWriting();
828                 if (gatheringWrite) {
829                     result.setResult((V)Long.valueOf(bytesTransferred));
830                 } else {
831                     result.setResult((V)Integer.valueOf(bytesTransferred));
832                 }
833             }
834             if (canInvokeDirect) {
835                 Invoker.invokeUnchecked(result);
836             } else {
837                 Invoker.invoke(result);
838             }
839         }
840 
841         @Override
failed(int error, IOException x)842         public void failed(int error, IOException x) {
843             // return direct buffer to cache if substituted
844             releaseBuffers();
845 
846             // release waiters if not already released by timeout
847             if (!isOpen())
848                 x = new AsynchronousCloseException();
849 
850             synchronized (result) {
851                 if (result.isDone())
852                     return;
853                 enableWriting();
854                 result.setFailure(x);
855             }
856             Invoker.invoke(result);
857         }
858 
859         /**
860          * Invoked if timeout expires before it is cancelled
861          */
timeout()862         void timeout() {
863             // synchronize on result as the I/O could complete/fail
864             synchronized (result) {
865                 if (result.isDone())
866                     return;
867 
868                 // kill further writing before releasing waiters
869                 enableWriting(true);
870                 result.setFailure(new InterruptedByTimeoutException());
871             }
872 
873             // invoke handler without any locks
874             Invoker.invoke(result);
875         }
876     }
877 
878     @Override
implWrite(boolean gatheringWrite, ByteBuffer src, ByteBuffer[] srcs, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)879     <V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
880                                              ByteBuffer src,
881                                              ByteBuffer[] srcs,
882                                              long timeout,
883                                              TimeUnit unit,
884                                              A attachment,
885                                              CompletionHandler<V,? super A> handler)
886     {
887         // setup task
888         PendingFuture<V,A> result =
889             new PendingFuture<V,A>(this, handler, attachment);
890         ByteBuffer[] bufs;
891         if (gatheringWrite) {
892             bufs = srcs;
893         } else {
894             bufs = new ByteBuffer[1];
895             bufs[0] = src;
896         }
897         final WriteTask<V,A> writeTask =
898                 new WriteTask<V,A>(bufs, gatheringWrite, result);
899         result.setContext(writeTask);
900 
901         // schedule timeout
902         if (timeout > 0L) {
903             Future<?> timeoutTask = iocp.schedule(new Runnable() {
904                 public void run() {
905                     writeTask.timeout();
906                 }
907             }, timeout, unit);
908             result.setTimeoutTask(timeoutTask);
909         }
910 
911         // initiate I/O
912         writeTask.run();
913         return result;
914     }
915 
916     // -- Native methods --
917 
initIDs()918     private static native void initIDs();
919 
connect0(long socket, boolean preferIPv6, InetAddress remote, int remotePort, long overlapped)920     private static native int connect0(long socket, boolean preferIPv6,
921         InetAddress remote, int remotePort, long overlapped) throws IOException;
922 
updateConnectContext(long socket)923     private static native void updateConnectContext(long socket) throws IOException;
924 
read0(long socket, int count, long addres, long overlapped)925     private static native int read0(long socket, int count, long addres, long overlapped)
926         throws IOException;
927 
write0(long socket, int count, long address, long overlapped)928     private static native int write0(long socket, int count, long address,
929         long overlapped) throws IOException;
930 
shutdown0(long socket, int how)931     private static native void shutdown0(long socket, int how) throws IOException;
932 
closesocket0(long socket)933     private static native void closesocket0(long socket) throws IOException;
934 
935     static {
IOUtil.load()936         IOUtil.load();
initIDs()937         initIDs();
938     }
939 }
940