1 /*
2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package sun.nio.ch;
27 
28 import java.nio.channels.*;
29 import java.nio.ByteBuffer;
30 import java.nio.BufferOverflowException;
31 import java.net.*;
32 import java.util.concurrent.*;
33 import java.io.IOException;
34 import java.security.AccessController;
35 import java.security.PrivilegedActionException;
36 import java.security.PrivilegedExceptionAction;
37 import sun.misc.Unsafe;
38 
39 /**
40  * Windows implementation of AsynchronousSocketChannel using overlapped I/O.
41  */
42 
43 class WindowsAsynchronousSocketChannelImpl
44     extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
45 {
46     private static final Unsafe unsafe = Unsafe.getUnsafe();
47     private static int addressSize = unsafe.addressSize();
48 
dependsArch(int value32, int value64)49     private static int dependsArch(int value32, int value64) {
50         return (addressSize == 4) ? value32 : value64;
51     }
52 
53     /*
54      * typedef struct _WSABUF {
55      *     u_long      len;
56      *     char FAR *  buf;
57      * } WSABUF;
58      */
59     private static final int SIZEOF_WSABUF  = dependsArch(8, 16);
60     private static final int OFFSETOF_LEN   = 0;
61     private static final int OFFSETOF_BUF   = dependsArch(4, 8);
62 
63     // maximum vector size for scatter/gather I/O
64     private static final int MAX_WSABUF     = 16;
65 
66     private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
67 
68 
69     // socket handle. Use begin()/end() around each usage of this handle.
70     final long handle;
71 
72     // I/O completion port that the socket is associated with
73     private final Iocp iocp;
74 
75     // completion key to identify channel when I/O completes
76     private final int completionKey;
77 
78     // Pending I/O operations are tied to an OVERLAPPED structure that can only
79     // be released when the I/O completion event is posted to the completion
80     // port. Where I/O operations complete immediately then it is possible
81     // there may be more than two OVERLAPPED structures in use.
82     private final PendingIoCache ioCache;
83 
84     // per-channel arrays of WSABUF structures
85     private final long readBufferArray;
86     private final long writeBufferArray;
87 
88 
WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)89     WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
90         throws IOException
91     {
92         super(iocp);
93 
94         // associate socket with default completion port
95         long h = IOUtil.fdVal(fd);
96         int key = 0;
97         try {
98             key = iocp.associate(this, h);
99         } catch (ShutdownChannelGroupException x) {
100             if (failIfGroupShutdown) {
101                 closesocket0(h);
102                 throw x;
103             }
104         } catch (IOException x) {
105             closesocket0(h);
106             throw x;
107         }
108 
109         this.handle = h;
110         this.iocp = iocp;
111         this.completionKey = key;
112         this.ioCache = new PendingIoCache();
113 
114         // allocate WSABUF arrays
115         this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
116         this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
117     }
118 
WindowsAsynchronousSocketChannelImpl(Iocp iocp)119     WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
120         this(iocp, true);
121     }
122 
123     @Override
group()124     public AsynchronousChannelGroupImpl group() {
125         return iocp;
126     }
127 
128     /**
129      * Invoked by Iocp when an I/O operation competes.
130      */
131     @Override
getByOverlapped(long overlapped)132     public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
133         return ioCache.remove(overlapped);
134     }
135 
136     // invoked by WindowsAsynchronousServerSocketChannelImpl
handle()137     long handle() {
138         return handle;
139     }
140 
141     // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
142     // accept
setConnected(InetSocketAddress localAddress, InetSocketAddress remoteAddress)143     void setConnected(InetSocketAddress localAddress,
144                       InetSocketAddress remoteAddress)
145     {
146         synchronized (stateLock) {
147             state = ST_CONNECTED;
148             this.localAddress = localAddress;
149             this.remoteAddress = remoteAddress;
150         }
151     }
152 
153     @Override
implClose()154     void implClose() throws IOException {
155         // close socket (may cause outstanding async I/O operations to fail).
156         closesocket0(handle);
157 
158         // waits until all I/O operations have completed
159         ioCache.close();
160 
161         // release arrays of WSABUF structures
162         unsafe.freeMemory(readBufferArray);
163         unsafe.freeMemory(writeBufferArray);
164 
165         // finally disassociate from the completion port (key can be 0 if
166         // channel created when group is shutdown)
167         if (completionKey != 0)
168             iocp.disassociate(completionKey);
169     }
170 
171     @Override
onCancel(PendingFuture<?,?> task)172     public void onCancel(PendingFuture<?,?> task) {
173         if (task.getContext() instanceof ConnectTask)
174             killConnect();
175         if (task.getContext() instanceof ReadTask)
176             killReading();
177         if (task.getContext() instanceof WriteTask)
178             killWriting();
179     }
180 
181     /**
182      * Implements the task to initiate a connection and the handler to
183      * consume the result when the connection is established (or fails).
184      */
185     private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
186         private final InetSocketAddress remote;
187         private final PendingFuture<Void,A> result;
188 
ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result)189         ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
190             this.remote = remote;
191             this.result = result;
192         }
193 
closeChannel()194         private void closeChannel() {
195             try {
196                 close();
197             } catch (IOException ignore) { }
198         }
199 
toIOException(Throwable x)200         private IOException toIOException(Throwable x) {
201             if (x instanceof IOException) {
202                 if (x instanceof ClosedChannelException)
203                     x = new AsynchronousCloseException();
204                 return (IOException)x;
205             }
206             return new IOException(x);
207         }
208 
209         /**
210          * Invoke after a connection is successfully established.
211          */
afterConnect()212         private void afterConnect() throws IOException {
213             updateConnectContext(handle);
214             synchronized (stateLock) {
215                 state = ST_CONNECTED;
216                 remoteAddress = remote;
217             }
218         }
219 
220         /**
221          * Task to initiate a connection.
222          */
223         @Override
run()224         public void run() {
225             long overlapped = 0L;
226             Throwable exc = null;
227             try {
228                 begin();
229 
230                 // synchronize on result to allow this thread handle the case
231                 // where the connection is established immediately.
232                 synchronized (result) {
233                     overlapped = ioCache.add(result);
234                     // initiate the connection
235                     int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
236                                      remote.getPort(), overlapped);
237                     if (n == IOStatus.UNAVAILABLE) {
238                         // connection is pending
239                         return;
240                     }
241 
242                     // connection established immediately
243                     afterConnect();
244                     result.setResult(null);
245                 }
246             } catch (Throwable x) {
247                 if (overlapped != 0L)
248                     ioCache.remove(overlapped);
249                 exc = x;
250             } finally {
251                 end();
252             }
253 
254             if (exc != null) {
255                 closeChannel();
256                 result.setFailure(toIOException(exc));
257             }
258             Invoker.invoke(result);
259         }
260 
261         /**
262          * Invoked by handler thread when connection established.
263          */
264         @Override
completed(int bytesTransferred, boolean canInvokeDirect)265         public void completed(int bytesTransferred, boolean canInvokeDirect) {
266             Throwable exc = null;
267             try {
268                 begin();
269                 afterConnect();
270                 result.setResult(null);
271             } catch (Throwable x) {
272                 // channel is closed or unable to finish connect
273                 exc = x;
274             } finally {
275                 end();
276             }
277 
278             // can't close channel while in begin/end block
279             if (exc != null) {
280                 closeChannel();
281                 result.setFailure(toIOException(exc));
282             }
283 
284             if (canInvokeDirect) {
285                 Invoker.invokeUnchecked(result);
286             } else {
287                 Invoker.invoke(result);
288             }
289         }
290 
291         /**
292          * Invoked by handler thread when failed to establish connection.
293          */
294         @Override
failed(int error, IOException x)295         public void failed(int error, IOException x) {
296             if (isOpen()) {
297                 closeChannel();
298                 result.setFailure(x);
299             } else {
300                 result.setFailure(new AsynchronousCloseException());
301             }
302             Invoker.invoke(result);
303         }
304     }
305 
doPrivilegedBind(final SocketAddress sa)306     private void doPrivilegedBind(final SocketAddress sa) throws IOException {
307         try {
308             AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
309                 public Void run() throws IOException {
310                     bind(sa);
311                     return null;
312                 }
313             });
314         } catch (PrivilegedActionException e) {
315             throw (IOException) e.getException();
316         }
317     }
318 
319     @Override
implConnect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler)320     <A> Future<Void> implConnect(SocketAddress remote,
321                                  A attachment,
322                                  CompletionHandler<Void,? super A> handler)
323     {
324         if (!isOpen()) {
325             Throwable exc = new ClosedChannelException();
326             if (handler == null)
327                 return CompletedFuture.withFailure(exc);
328             Invoker.invoke(this, handler, attachment, null, exc);
329             return null;
330         }
331 
332         InetSocketAddress isa = Net.checkAddress(remote);
333 
334         // permission check
335         SecurityManager sm = System.getSecurityManager();
336         if (sm != null)
337             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
338 
339         // check and update state
340         // ConnectEx requires the socket to be bound to a local address
341         IOException bindException = null;
342         synchronized (stateLock) {
343             if (state == ST_CONNECTED)
344                 throw new AlreadyConnectedException();
345             if (state == ST_PENDING)
346                 throw new ConnectionPendingException();
347             if (localAddress == null) {
348                 try {
349                     SocketAddress any = new InetSocketAddress(0);
350                     if (sm == null) {
351                         bind(any);
352                     } else {
353                         doPrivilegedBind(any);
354                     }
355                 } catch (IOException x) {
356                     bindException = x;
357                 }
358             }
359             if (bindException == null)
360                 state = ST_PENDING;
361         }
362 
363         // handle bind failure
364         if (bindException != null) {
365             try {
366                 close();
367             } catch (IOException ignore) { }
368             if (handler == null)
369                 return CompletedFuture.withFailure(bindException);
370             Invoker.invoke(this, handler, attachment, null, bindException);
371             return null;
372         }
373 
374         // setup task
375         PendingFuture<Void,A> result =
376             new PendingFuture<Void,A>(this, handler, attachment);
377         ConnectTask<A> task = new ConnectTask<A>(isa, result);
378         result.setContext(task);
379 
380         // initiate I/O
381         if (Iocp.supportsThreadAgnosticIo()) {
382             task.run();
383         } else {
384             Invoker.invokeOnThreadInThreadPool(this, task);
385         }
386         return result;
387     }
388 
389     /**
390      * Implements the task to initiate a read and the handler to consume the
391      * result when the read completes.
392      */
393     private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
394         private final ByteBuffer[] bufs;
395         private final int numBufs;
396         private final boolean scatteringRead;
397         private final PendingFuture<V,A> result;
398 
399         // set by run method
400         private ByteBuffer[] shadow;
401 
ReadTask(ByteBuffer[] bufs, boolean scatteringRead, PendingFuture<V,A> result)402         ReadTask(ByteBuffer[] bufs,
403                  boolean scatteringRead,
404                  PendingFuture<V,A> result)
405         {
406             this.bufs = bufs;
407             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
408             this.scatteringRead = scatteringRead;
409             this.result = result;
410         }
411 
412         /**
413          * Invoked prior to read to prepare the WSABUF array. Where necessary,
414          * it substitutes non-direct buffers with direct buffers.
415          */
prepareBuffers()416         void prepareBuffers() {
417             shadow = new ByteBuffer[numBufs];
418             long address = readBufferArray;
419             for (int i=0; i<numBufs; i++) {
420                 ByteBuffer dst = bufs[i];
421                 int pos = dst.position();
422                 int lim = dst.limit();
423                 assert (pos <= lim);
424                 int rem = (pos <= lim ? lim - pos : 0);
425                 long a;
426                 if (!(dst instanceof DirectBuffer)) {
427                     // substitute with direct buffer
428                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
429                     shadow[i] = bb;
430                     a = ((DirectBuffer)bb).address();
431                 } else {
432                     shadow[i] = dst;
433                     a = ((DirectBuffer)dst).address() + pos;
434                 }
435                 unsafe.putAddress(address + OFFSETOF_BUF, a);
436                 unsafe.putInt(address + OFFSETOF_LEN, rem);
437                 address += SIZEOF_WSABUF;
438             }
439         }
440 
441         /**
442          * Invoked after a read has completed to update the buffer positions
443          * and release any substituted buffers.
444          */
updateBuffers(int bytesRead)445         void updateBuffers(int bytesRead) {
446             for (int i=0; i<numBufs; i++) {
447                 ByteBuffer nextBuffer = shadow[i];
448                 int pos = nextBuffer.position();
449                 int len = nextBuffer.remaining();
450                 if (bytesRead >= len) {
451                     bytesRead -= len;
452                     int newPosition = pos + len;
453                     try {
454                         nextBuffer.position(newPosition);
455                     } catch (IllegalArgumentException x) {
456                         // position changed by another
457                     }
458                 } else { // Buffers not completely filled
459                     if (bytesRead > 0) {
460                         assert(pos + bytesRead < (long)Integer.MAX_VALUE);
461                         int newPosition = pos + bytesRead;
462                         try {
463                             nextBuffer.position(newPosition);
464                         } catch (IllegalArgumentException x) {
465                             // position changed by another
466                         }
467                     }
468                     break;
469                 }
470             }
471 
472             // Put results from shadow into the slow buffers
473             for (int i=0; i<numBufs; i++) {
474                 if (!(bufs[i] instanceof DirectBuffer)) {
475                     shadow[i].flip();
476                     try {
477                         bufs[i].put(shadow[i]);
478                     } catch (BufferOverflowException x) {
479                         // position changed by another
480                     }
481                 }
482             }
483         }
484 
releaseBuffers()485         void releaseBuffers() {
486             for (int i=0; i<numBufs; i++) {
487                 if (!(bufs[i] instanceof DirectBuffer)) {
488                     Util.releaseTemporaryDirectBuffer(shadow[i]);
489                 }
490             }
491         }
492 
493         @Override
494         @SuppressWarnings("unchecked")
run()495         public void run() {
496             long overlapped = 0L;
497             boolean prepared = false;
498             boolean pending = false;
499 
500             try {
501                 begin();
502 
503                 // substitute non-direct buffers
504                 prepareBuffers();
505                 prepared = true;
506 
507                 // get an OVERLAPPED structure (from the cache or allocate)
508                 overlapped = ioCache.add(result);
509 
510                 // initiate read
511                 int n = read0(handle, numBufs, readBufferArray, overlapped);
512                 if (n == IOStatus.UNAVAILABLE) {
513                     // I/O is pending
514                     pending = true;
515                     return;
516                 }
517                 if (n == IOStatus.EOF) {
518                     // input shutdown
519                     enableReading();
520                     if (scatteringRead) {
521                         result.setResult((V)Long.valueOf(-1L));
522                     } else {
523                         result.setResult((V)Integer.valueOf(-1));
524                     }
525                 } else {
526                     throw new InternalError("Read completed immediately");
527                 }
528             } catch (Throwable x) {
529                 // failed to initiate read
530                 // reset read flag before releasing waiters
531                 enableReading();
532                 if (x instanceof ClosedChannelException)
533                     x = new AsynchronousCloseException();
534                 if (!(x instanceof IOException))
535                     x = new IOException(x);
536                 result.setFailure(x);
537             } finally {
538                 // release resources if I/O not pending
539                 if (!pending) {
540                     if (overlapped != 0L)
541                         ioCache.remove(overlapped);
542                     if (prepared)
543                         releaseBuffers();
544                 }
545                 end();
546             }
547 
548             // invoke completion handler
549             Invoker.invoke(result);
550         }
551 
552         /**
553          * Executed when the I/O has completed
554          */
555         @Override
556         @SuppressWarnings("unchecked")
completed(int bytesTransferred, boolean canInvokeDirect)557         public void completed(int bytesTransferred, boolean canInvokeDirect) {
558             if (bytesTransferred == 0) {
559                 bytesTransferred = -1;  // EOF
560             } else {
561                 updateBuffers(bytesTransferred);
562             }
563 
564             // return direct buffer to cache if substituted
565             releaseBuffers();
566 
567             // release waiters if not already released by timeout
568             synchronized (result) {
569                 if (result.isDone())
570                     return;
571                 enableReading();
572                 if (scatteringRead) {
573                     result.setResult((V)Long.valueOf(bytesTransferred));
574                 } else {
575                     result.setResult((V)Integer.valueOf(bytesTransferred));
576                 }
577             }
578             if (canInvokeDirect) {
579                 Invoker.invokeUnchecked(result);
580             } else {
581                 Invoker.invoke(result);
582             }
583         }
584 
585         @Override
failed(int error, IOException x)586         public void failed(int error, IOException x) {
587             // return direct buffer to cache if substituted
588             releaseBuffers();
589 
590             // release waiters if not already released by timeout
591             if (!isOpen())
592                 x = new AsynchronousCloseException();
593 
594             synchronized (result) {
595                 if (result.isDone())
596                     return;
597                 enableReading();
598                 result.setFailure(x);
599             }
600             Invoker.invoke(result);
601         }
602 
603         /**
604          * Invoked if timeout expires before it is cancelled
605          */
timeout()606         void timeout() {
607             // synchronize on result as the I/O could complete/fail
608             synchronized (result) {
609                 if (result.isDone())
610                     return;
611 
612                 // kill further reading before releasing waiters
613                 enableReading(true);
614                 result.setFailure(new InterruptedByTimeoutException());
615             }
616 
617             // invoke handler without any locks
618             Invoker.invoke(result);
619         }
620     }
621 
622     @Override
implRead(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)623     <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
624                                             ByteBuffer dst,
625                                             ByteBuffer[] dsts,
626                                             long timeout,
627                                             TimeUnit unit,
628                                             A attachment,
629                                             CompletionHandler<V,? super A> handler)
630     {
631         // setup task
632         PendingFuture<V,A> result =
633             new PendingFuture<V,A>(this, handler, attachment);
634         ByteBuffer[] bufs;
635         if (isScatteringRead) {
636             bufs = dsts;
637         } else {
638             bufs = new ByteBuffer[1];
639             bufs[0] = dst;
640         }
641         final ReadTask<V,A> readTask =
642                 new ReadTask<V,A>(bufs, isScatteringRead, result);
643         result.setContext(readTask);
644 
645         // schedule timeout
646         if (timeout > 0L) {
647             Future<?> timeoutTask = iocp.schedule(new Runnable() {
648                 public void run() {
649                     readTask.timeout();
650                 }
651             }, timeout, unit);
652             result.setTimeoutTask(timeoutTask);
653         }
654 
655         // initiate I/O
656         if (Iocp.supportsThreadAgnosticIo()) {
657             readTask.run();
658         } else {
659             Invoker.invokeOnThreadInThreadPool(this, readTask);
660         }
661         return result;
662     }
663 
664     /**
665      * Implements the task to initiate a write and the handler to consume the
666      * result when the write completes.
667      */
668     private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
669         private final ByteBuffer[] bufs;
670         private final int numBufs;
671         private final boolean gatheringWrite;
672         private final PendingFuture<V,A> result;
673 
674         // set by run method
675         private ByteBuffer[] shadow;
676 
WriteTask(ByteBuffer[] bufs, boolean gatheringWrite, PendingFuture<V,A> result)677         WriteTask(ByteBuffer[] bufs,
678                   boolean gatheringWrite,
679                   PendingFuture<V,A> result)
680         {
681             this.bufs = bufs;
682             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
683             this.gatheringWrite = gatheringWrite;
684             this.result = result;
685         }
686 
687         /**
688          * Invoked prior to write to prepare the WSABUF array. Where necessary,
689          * it substitutes non-direct buffers with direct buffers.
690          */
prepareBuffers()691         void prepareBuffers() {
692             shadow = new ByteBuffer[numBufs];
693             long address = writeBufferArray;
694             for (int i=0; i<numBufs; i++) {
695                 ByteBuffer src = bufs[i];
696                 int pos = src.position();
697                 int lim = src.limit();
698                 assert (pos <= lim);
699                 int rem = (pos <= lim ? lim - pos : 0);
700                 long a;
701                 if (!(src instanceof DirectBuffer)) {
702                     // substitute with direct buffer
703                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
704                     bb.put(src);
705                     bb.flip();
706                     src.position(pos);  // leave heap buffer untouched for now
707                     shadow[i] = bb;
708                     a = ((DirectBuffer)bb).address();
709                 } else {
710                     shadow[i] = src;
711                     a = ((DirectBuffer)src).address() + pos;
712                 }
713                 unsafe.putAddress(address + OFFSETOF_BUF, a);
714                 unsafe.putInt(address + OFFSETOF_LEN, rem);
715                 address += SIZEOF_WSABUF;
716             }
717         }
718 
719         /**
720          * Invoked after a write has completed to update the buffer positions
721          * and release any substituted buffers.
722          */
updateBuffers(int bytesWritten)723         void updateBuffers(int bytesWritten) {
724             // Notify the buffers how many bytes were taken
725             for (int i=0; i<numBufs; i++) {
726                 ByteBuffer nextBuffer = bufs[i];
727                 int pos = nextBuffer.position();
728                 int lim = nextBuffer.limit();
729                 int len = (pos <= lim ? lim - pos : lim);
730                 if (bytesWritten >= len) {
731                     bytesWritten -= len;
732                     int newPosition = pos + len;
733                     try {
734                         nextBuffer.position(newPosition);
735                     } catch (IllegalArgumentException x) {
736                         // position changed by someone else
737                     }
738                 } else { // Buffers not completely filled
739                     if (bytesWritten > 0) {
740                         assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
741                         int newPosition = pos + bytesWritten;
742                         try {
743                             nextBuffer.position(newPosition);
744                         } catch (IllegalArgumentException x) {
745                             // position changed by someone else
746                         }
747                     }
748                     break;
749                 }
750             }
751         }
752 
releaseBuffers()753         void releaseBuffers() {
754             for (int i=0; i<numBufs; i++) {
755                 if (!(bufs[i] instanceof DirectBuffer)) {
756                     Util.releaseTemporaryDirectBuffer(shadow[i]);
757                 }
758             }
759         }
760 
761         @Override
762         //@SuppressWarnings("unchecked")
run()763         public void run() {
764             long overlapped = 0L;
765             boolean prepared = false;
766             boolean pending = false;
767             boolean shutdown = false;
768 
769             try {
770                 begin();
771 
772                 // substitute non-direct buffers
773                 prepareBuffers();
774                 prepared = true;
775 
776                 // get an OVERLAPPED structure (from the cache or allocate)
777                 overlapped = ioCache.add(result);
778                 int n = write0(handle, numBufs, writeBufferArray, overlapped);
779                 if (n == IOStatus.UNAVAILABLE) {
780                     // I/O is pending
781                     pending = true;
782                     return;
783                 }
784                 if (n == IOStatus.EOF) {
785                     // special case for shutdown output
786                     shutdown = true;
787                     throw new ClosedChannelException();
788                 }
789                 // write completed immediately
790                 throw new InternalError("Write completed immediately");
791             } catch (Throwable x) {
792                 // write failed. Enable writing before releasing waiters.
793                 enableWriting();
794                 if (!shutdown && (x instanceof ClosedChannelException))
795                     x = new AsynchronousCloseException();
796                 if (!(x instanceof IOException))
797                     x = new IOException(x);
798                 result.setFailure(x);
799             } finally {
800                 // release resources if I/O not pending
801                 if (!pending) {
802                     if (overlapped != 0L)
803                         ioCache.remove(overlapped);
804                     if (prepared)
805                         releaseBuffers();
806                 }
807                 end();
808             }
809 
810             // invoke completion handler
811             Invoker.invoke(result);
812         }
813 
814         /**
815          * Executed when the I/O has completed
816          */
817         @Override
818         @SuppressWarnings("unchecked")
completed(int bytesTransferred, boolean canInvokeDirect)819         public void completed(int bytesTransferred, boolean canInvokeDirect) {
820             updateBuffers(bytesTransferred);
821 
822             // return direct buffer to cache if substituted
823             releaseBuffers();
824 
825             // release waiters if not already released by timeout
826             synchronized (result) {
827                 if (result.isDone())
828                     return;
829                 enableWriting();
830                 if (gatheringWrite) {
831                     result.setResult((V)Long.valueOf(bytesTransferred));
832                 } else {
833                     result.setResult((V)Integer.valueOf(bytesTransferred));
834                 }
835             }
836             if (canInvokeDirect) {
837                 Invoker.invokeUnchecked(result);
838             } else {
839                 Invoker.invoke(result);
840             }
841         }
842 
843         @Override
failed(int error, IOException x)844         public void failed(int error, IOException x) {
845             // return direct buffer to cache if substituted
846             releaseBuffers();
847 
848             // release waiters if not already released by timeout
849             if (!isOpen())
850                 x = new AsynchronousCloseException();
851 
852             synchronized (result) {
853                 if (result.isDone())
854                     return;
855                 enableWriting();
856                 result.setFailure(x);
857             }
858             Invoker.invoke(result);
859         }
860 
861         /**
862          * Invoked if timeout expires before it is cancelled
863          */
timeout()864         void timeout() {
865             // synchronize on result as the I/O could complete/fail
866             synchronized (result) {
867                 if (result.isDone())
868                     return;
869 
870                 // kill further writing before releasing waiters
871                 enableWriting(true);
872                 result.setFailure(new InterruptedByTimeoutException());
873             }
874 
875             // invoke handler without any locks
876             Invoker.invoke(result);
877         }
878     }
879 
880     @Override
implWrite(boolean gatheringWrite, ByteBuffer src, ByteBuffer[] srcs, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)881     <V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
882                                              ByteBuffer src,
883                                              ByteBuffer[] srcs,
884                                              long timeout,
885                                              TimeUnit unit,
886                                              A attachment,
887                                              CompletionHandler<V,? super A> handler)
888     {
889         // setup task
890         PendingFuture<V,A> result =
891             new PendingFuture<V,A>(this, handler, attachment);
892         ByteBuffer[] bufs;
893         if (gatheringWrite) {
894             bufs = srcs;
895         } else {
896             bufs = new ByteBuffer[1];
897             bufs[0] = src;
898         }
899         final WriteTask<V,A> writeTask =
900                 new WriteTask<V,A>(bufs, gatheringWrite, result);
901         result.setContext(writeTask);
902 
903         // schedule timeout
904         if (timeout > 0L) {
905             Future<?> timeoutTask = iocp.schedule(new Runnable() {
906                 public void run() {
907                     writeTask.timeout();
908                 }
909             }, timeout, unit);
910             result.setTimeoutTask(timeoutTask);
911         }
912 
913         // initiate I/O (can only be done from thread in thread pool)
914         // initiate I/O
915         if (Iocp.supportsThreadAgnosticIo()) {
916             writeTask.run();
917         } else {
918             Invoker.invokeOnThreadInThreadPool(this, writeTask);
919         }
920         return result;
921     }
922 
923     // -- Native methods --
924 
initIDs()925     private static native void initIDs();
926 
connect0(long socket, boolean preferIPv6, InetAddress remote, int remotePort, long overlapped)927     private static native int connect0(long socket, boolean preferIPv6,
928         InetAddress remote, int remotePort, long overlapped) throws IOException;
929 
updateConnectContext(long socket)930     private static native void updateConnectContext(long socket) throws IOException;
931 
read0(long socket, int count, long addres, long overlapped)932     private static native int read0(long socket, int count, long addres, long overlapped)
933         throws IOException;
934 
write0(long socket, int count, long address, long overlapped)935     private static native int write0(long socket, int count, long address,
936         long overlapped) throws IOException;
937 
shutdown0(long socket, int how)938     private static native void shutdown0(long socket, int how) throws IOException;
939 
closesocket0(long socket)940     private static native void closesocket0(long socket) throws IOException;
941 
942     static {
IOUtil.load()943         IOUtil.load();
initIDs()944         initIDs();
945     }
946 }
947