1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.ByteBuffer; 30 import java.nio.BufferOverflowException; 31 import java.net.*; 32 import java.util.concurrent.*; 33 import java.io.IOException; 34 import java.security.AccessController; 35 import java.security.PrivilegedActionException; 36 import java.security.PrivilegedExceptionAction; 37 import jdk.internal.misc.Unsafe; 38 import sun.net.util.SocketExceptions; 39 40 /** 41 * Windows implementation of AsynchronousSocketChannel using overlapped I/O. 42 */ 43 44 class WindowsAsynchronousSocketChannelImpl 45 extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel 46 { 47 private static final Unsafe unsafe = Unsafe.getUnsafe(); 48 private static int addressSize = unsafe.addressSize(); 49 dependsArch(int value32, int value64)50 private static int dependsArch(int value32, int value64) { 51 return (addressSize == 4) ? value32 : value64; 52 } 53 54 /* 55 * typedef struct _WSABUF { 56 * u_long len; 57 * char FAR * buf; 58 * } WSABUF; 59 */ 60 private static final int SIZEOF_WSABUF = dependsArch(8, 16); 61 private static final int OFFSETOF_LEN = 0; 62 private static final int OFFSETOF_BUF = dependsArch(4, 8); 63 64 // maximum vector size for scatter/gather I/O 65 private static final int MAX_WSABUF = 16; 66 67 private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF; 68 69 70 // socket handle. Use begin()/end() around each usage of this handle. 71 final long handle; 72 73 // I/O completion port that the socket is associated with 74 private final Iocp iocp; 75 76 // completion key to identify channel when I/O completes 77 private final int completionKey; 78 79 // Pending I/O operations are tied to an OVERLAPPED structure that can only 80 // be released when the I/O completion event is posted to the completion 81 // port. Where I/O operations complete immediately then it is possible 82 // there may be more than two OVERLAPPED structures in use. 83 private final PendingIoCache ioCache; 84 85 // per-channel arrays of WSABUF structures 86 private final long readBufferArray; 87 private final long writeBufferArray; 88 89 WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)90 WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown) 91 throws IOException 92 { 93 super(iocp); 94 95 // associate socket with default completion port 96 long h = IOUtil.fdVal(fd); 97 int key = 0; 98 try { 99 key = iocp.associate(this, h); 100 } catch (ShutdownChannelGroupException x) { 101 if (failIfGroupShutdown) { 102 closesocket0(h); 103 throw x; 104 } 105 } catch (IOException x) { 106 closesocket0(h); 107 throw x; 108 } 109 110 this.handle = h; 111 this.iocp = iocp; 112 this.completionKey = key; 113 this.ioCache = new PendingIoCache(); 114 115 // allocate WSABUF arrays 116 this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); 117 this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); 118 } 119 WindowsAsynchronousSocketChannelImpl(Iocp iocp)120 WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException { 121 this(iocp, true); 122 } 123 124 @Override group()125 public AsynchronousChannelGroupImpl group() { 126 return iocp; 127 } 128 129 /** 130 * Invoked by Iocp when an I/O operation competes. 131 */ 132 @Override getByOverlapped(long overlapped)133 public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { 134 return ioCache.remove(overlapped); 135 } 136 137 // invoked by WindowsAsynchronousServerSocketChannelImpl handle()138 long handle() { 139 return handle; 140 } 141 142 // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection 143 // accept setConnected(InetSocketAddress localAddress, InetSocketAddress remoteAddress)144 void setConnected(InetSocketAddress localAddress, 145 InetSocketAddress remoteAddress) 146 { 147 synchronized (stateLock) { 148 state = ST_CONNECTED; 149 this.localAddress = localAddress; 150 this.remoteAddress = remoteAddress; 151 } 152 } 153 154 @Override implClose()155 void implClose() throws IOException { 156 // close socket (may cause outstanding async I/O operations to fail). 157 closesocket0(handle); 158 159 // waits until all I/O operations have completed 160 ioCache.close(); 161 162 // release arrays of WSABUF structures 163 unsafe.freeMemory(readBufferArray); 164 unsafe.freeMemory(writeBufferArray); 165 166 // finally disassociate from the completion port (key can be 0 if 167 // channel created when group is shutdown) 168 if (completionKey != 0) 169 iocp.disassociate(completionKey); 170 } 171 172 @Override onCancel(PendingFuture<?,?> task)173 public void onCancel(PendingFuture<?,?> task) { 174 if (task.getContext() instanceof ConnectTask) 175 killConnect(); 176 if (task.getContext() instanceof ReadTask) 177 killReading(); 178 if (task.getContext() instanceof WriteTask) 179 killWriting(); 180 } 181 182 /** 183 * Implements the task to initiate a connection and the handler to 184 * consume the result when the connection is established (or fails). 185 */ 186 private class ConnectTask<A> implements Runnable, Iocp.ResultHandler { 187 private final InetSocketAddress remote; 188 private final PendingFuture<Void,A> result; 189 ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result)190 ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) { 191 this.remote = remote; 192 this.result = result; 193 } 194 closeChannel()195 private void closeChannel() { 196 try { 197 close(); 198 } catch (IOException ignore) { } 199 } 200 toIOException(Throwable x)201 private IOException toIOException(Throwable x) { 202 if (x instanceof IOException) { 203 if (x instanceof ClosedChannelException) 204 x = new AsynchronousCloseException(); 205 return (IOException)x; 206 } 207 return new IOException(x); 208 } 209 210 /** 211 * Invoke after a connection is successfully established. 212 */ afterConnect()213 private void afterConnect() throws IOException { 214 updateConnectContext(handle); 215 synchronized (stateLock) { 216 state = ST_CONNECTED; 217 remoteAddress = remote; 218 } 219 } 220 221 /** 222 * Task to initiate a connection. 223 */ 224 @Override run()225 public void run() { 226 long overlapped = 0L; 227 Throwable exc = null; 228 try { 229 begin(); 230 231 // synchronize on result to allow this thread handle the case 232 // where the connection is established immediately. 233 synchronized (result) { 234 overlapped = ioCache.add(result); 235 // initiate the connection 236 int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(), 237 remote.getPort(), overlapped); 238 if (n == IOStatus.UNAVAILABLE) { 239 // connection is pending 240 return; 241 } 242 243 // connection established immediately 244 afterConnect(); 245 result.setResult(null); 246 } 247 } catch (Throwable x) { 248 if (overlapped != 0L) 249 ioCache.remove(overlapped); 250 exc = x; 251 } finally { 252 end(); 253 } 254 255 if (exc != null) { 256 closeChannel(); 257 exc = SocketExceptions.of(toIOException(exc), remote); 258 result.setFailure(exc); 259 } 260 Invoker.invoke(result); 261 } 262 263 /** 264 * Invoked by handler thread when connection established. 265 */ 266 @Override completed(int bytesTransferred, boolean canInvokeDirect)267 public void completed(int bytesTransferred, boolean canInvokeDirect) { 268 Throwable exc = null; 269 try { 270 begin(); 271 afterConnect(); 272 result.setResult(null); 273 } catch (Throwable x) { 274 // channel is closed or unable to finish connect 275 exc = x; 276 } finally { 277 end(); 278 } 279 280 // can't close channel while in begin/end block 281 if (exc != null) { 282 closeChannel(); 283 IOException ee = toIOException(exc); 284 ee = SocketExceptions.of(ee, remote); 285 result.setFailure(ee); 286 } 287 288 if (canInvokeDirect) { 289 Invoker.invokeUnchecked(result); 290 } else { 291 Invoker.invoke(result); 292 } 293 } 294 295 /** 296 * Invoked by handler thread when failed to establish connection. 297 */ 298 @Override failed(int error, IOException x)299 public void failed(int error, IOException x) { 300 x = SocketExceptions.of(x, remote); 301 if (isOpen()) { 302 closeChannel(); 303 result.setFailure(x); 304 } else { 305 x = SocketExceptions.of(new AsynchronousCloseException(), remote); 306 result.setFailure(x); 307 } 308 Invoker.invoke(result); 309 } 310 } 311 doPrivilegedBind(final SocketAddress sa)312 private void doPrivilegedBind(final SocketAddress sa) throws IOException { 313 try { 314 AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() { 315 public Void run() throws IOException { 316 bind(sa); 317 return null; 318 } 319 }); 320 } catch (PrivilegedActionException e) { 321 throw (IOException) e.getException(); 322 } 323 } 324 325 @Override implConnect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler)326 <A> Future<Void> implConnect(SocketAddress remote, 327 A attachment, 328 CompletionHandler<Void,? super A> handler) 329 { 330 if (!isOpen()) { 331 Throwable exc = new ClosedChannelException(); 332 if (handler == null) 333 return CompletedFuture.withFailure(exc); 334 Invoker.invoke(this, handler, attachment, null, exc); 335 return null; 336 } 337 338 InetSocketAddress isa = Net.checkAddress(remote); 339 340 // permission check 341 SecurityManager sm = System.getSecurityManager(); 342 if (sm != null) 343 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 344 345 // check and update state 346 // ConnectEx requires the socket to be bound to a local address 347 IOException bindException = null; 348 synchronized (stateLock) { 349 if (state == ST_CONNECTED) 350 throw new AlreadyConnectedException(); 351 if (state == ST_PENDING) 352 throw new ConnectionPendingException(); 353 if (localAddress == null) { 354 try { 355 SocketAddress any = new InetSocketAddress(0); 356 if (sm == null) { 357 bind(any); 358 } else { 359 doPrivilegedBind(any); 360 } 361 } catch (IOException x) { 362 bindException = x; 363 } 364 } 365 if (bindException == null) 366 state = ST_PENDING; 367 } 368 369 // handle bind failure 370 if (bindException != null) { 371 try { 372 close(); 373 } catch (IOException ignore) { } 374 if (handler == null) 375 return CompletedFuture.withFailure(bindException); 376 Invoker.invoke(this, handler, attachment, null, bindException); 377 return null; 378 } 379 380 // setup task 381 PendingFuture<Void,A> result = 382 new PendingFuture<Void,A>(this, handler, attachment); 383 ConnectTask<A> task = new ConnectTask<A>(isa, result); 384 result.setContext(task); 385 386 // initiate I/O 387 task.run(); 388 return result; 389 } 390 391 /** 392 * Implements the task to initiate a read and the handler to consume the 393 * result when the read completes. 394 */ 395 private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler { 396 private final ByteBuffer[] bufs; 397 private final int numBufs; 398 private final boolean scatteringRead; 399 private final PendingFuture<V,A> result; 400 401 // set by run method 402 private ByteBuffer[] shadow; 403 ReadTask(ByteBuffer[] bufs, boolean scatteringRead, PendingFuture<V,A> result)404 ReadTask(ByteBuffer[] bufs, 405 boolean scatteringRead, 406 PendingFuture<V,A> result) 407 { 408 this.bufs = bufs; 409 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; 410 this.scatteringRead = scatteringRead; 411 this.result = result; 412 } 413 414 /** 415 * Invoked prior to read to prepare the WSABUF array. Where necessary, 416 * it substitutes non-direct buffers with direct buffers. 417 */ prepareBuffers()418 void prepareBuffers() { 419 shadow = new ByteBuffer[numBufs]; 420 long address = readBufferArray; 421 for (int i=0; i<numBufs; i++) { 422 ByteBuffer dst = bufs[i]; 423 int pos = dst.position(); 424 int lim = dst.limit(); 425 assert (pos <= lim); 426 int rem = (pos <= lim ? lim - pos : 0); 427 long a; 428 if (!(dst instanceof DirectBuffer)) { 429 // substitute with direct buffer 430 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 431 shadow[i] = bb; 432 a = ((DirectBuffer)bb).address(); 433 } else { 434 shadow[i] = dst; 435 a = ((DirectBuffer)dst).address() + pos; 436 } 437 unsafe.putAddress(address + OFFSETOF_BUF, a); 438 unsafe.putInt(address + OFFSETOF_LEN, rem); 439 address += SIZEOF_WSABUF; 440 } 441 } 442 443 /** 444 * Invoked after a read has completed to update the buffer positions 445 * and release any substituted buffers. 446 */ updateBuffers(int bytesRead)447 void updateBuffers(int bytesRead) { 448 for (int i=0; i<numBufs; i++) { 449 ByteBuffer nextBuffer = shadow[i]; 450 int pos = nextBuffer.position(); 451 int len = nextBuffer.remaining(); 452 if (bytesRead >= len) { 453 bytesRead -= len; 454 int newPosition = pos + len; 455 try { 456 nextBuffer.position(newPosition); 457 } catch (IllegalArgumentException x) { 458 // position changed by another 459 } 460 } else { // Buffers not completely filled 461 if (bytesRead > 0) { 462 assert(pos + bytesRead < (long)Integer.MAX_VALUE); 463 int newPosition = pos + bytesRead; 464 try { 465 nextBuffer.position(newPosition); 466 } catch (IllegalArgumentException x) { 467 // position changed by another 468 } 469 } 470 break; 471 } 472 } 473 474 // Put results from shadow into the slow buffers 475 for (int i=0; i<numBufs; i++) { 476 if (!(bufs[i] instanceof DirectBuffer)) { 477 shadow[i].flip(); 478 try { 479 bufs[i].put(shadow[i]); 480 } catch (BufferOverflowException x) { 481 // position changed by another 482 } 483 } 484 } 485 } 486 releaseBuffers()487 void releaseBuffers() { 488 for (int i=0; i<numBufs; i++) { 489 if (!(bufs[i] instanceof DirectBuffer)) { 490 Util.releaseTemporaryDirectBuffer(shadow[i]); 491 } 492 } 493 } 494 495 @Override 496 @SuppressWarnings("unchecked") run()497 public void run() { 498 long overlapped = 0L; 499 boolean prepared = false; 500 boolean pending = false; 501 502 try { 503 begin(); 504 505 // substitute non-direct buffers 506 prepareBuffers(); 507 prepared = true; 508 509 // get an OVERLAPPED structure (from the cache or allocate) 510 overlapped = ioCache.add(result); 511 512 // initiate read 513 int n = read0(handle, numBufs, readBufferArray, overlapped); 514 if (n == IOStatus.UNAVAILABLE) { 515 // I/O is pending 516 pending = true; 517 return; 518 } 519 if (n == IOStatus.EOF) { 520 // input shutdown 521 enableReading(); 522 if (scatteringRead) { 523 result.setResult((V)Long.valueOf(-1L)); 524 } else { 525 result.setResult((V)Integer.valueOf(-1)); 526 } 527 } else { 528 throw new InternalError("Read completed immediately"); 529 } 530 } catch (Throwable x) { 531 // failed to initiate read 532 // reset read flag before releasing waiters 533 enableReading(); 534 if (x instanceof ClosedChannelException) 535 x = new AsynchronousCloseException(); 536 if (!(x instanceof IOException)) 537 x = new IOException(x); 538 result.setFailure(x); 539 } finally { 540 // release resources if I/O not pending 541 if (!pending) { 542 if (overlapped != 0L) 543 ioCache.remove(overlapped); 544 if (prepared) 545 releaseBuffers(); 546 } 547 end(); 548 } 549 550 // invoke completion handler 551 Invoker.invoke(result); 552 } 553 554 /** 555 * Executed when the I/O has completed 556 */ 557 @Override 558 @SuppressWarnings("unchecked") completed(int bytesTransferred, boolean canInvokeDirect)559 public void completed(int bytesTransferred, boolean canInvokeDirect) { 560 if (bytesTransferred == 0) { 561 bytesTransferred = -1; // EOF 562 } else { 563 updateBuffers(bytesTransferred); 564 } 565 566 // return direct buffer to cache if substituted 567 releaseBuffers(); 568 569 // release waiters if not already released by timeout 570 synchronized (result) { 571 if (result.isDone()) 572 return; 573 enableReading(); 574 if (scatteringRead) { 575 result.setResult((V)Long.valueOf(bytesTransferred)); 576 } else { 577 result.setResult((V)Integer.valueOf(bytesTransferred)); 578 } 579 } 580 if (canInvokeDirect) { 581 Invoker.invokeUnchecked(result); 582 } else { 583 Invoker.invoke(result); 584 } 585 } 586 587 @Override failed(int error, IOException x)588 public void failed(int error, IOException x) { 589 // return direct buffer to cache if substituted 590 releaseBuffers(); 591 592 // release waiters if not already released by timeout 593 if (!isOpen()) 594 x = new AsynchronousCloseException(); 595 596 synchronized (result) { 597 if (result.isDone()) 598 return; 599 enableReading(); 600 result.setFailure(x); 601 } 602 Invoker.invoke(result); 603 } 604 605 /** 606 * Invoked if timeout expires before it is cancelled 607 */ timeout()608 void timeout() { 609 // synchronize on result as the I/O could complete/fail 610 synchronized (result) { 611 if (result.isDone()) 612 return; 613 614 // kill further reading before releasing waiters 615 enableReading(true); 616 result.setFailure(new InterruptedByTimeoutException()); 617 } 618 619 // invoke handler without any locks 620 Invoker.invoke(result); 621 } 622 } 623 624 @Override implRead(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)625 <V extends Number,A> Future<V> implRead(boolean isScatteringRead, 626 ByteBuffer dst, 627 ByteBuffer[] dsts, 628 long timeout, 629 TimeUnit unit, 630 A attachment, 631 CompletionHandler<V,? super A> handler) 632 { 633 // setup task 634 PendingFuture<V,A> result = 635 new PendingFuture<V,A>(this, handler, attachment); 636 ByteBuffer[] bufs; 637 if (isScatteringRead) { 638 bufs = dsts; 639 } else { 640 bufs = new ByteBuffer[1]; 641 bufs[0] = dst; 642 } 643 final ReadTask<V,A> readTask = 644 new ReadTask<V,A>(bufs, isScatteringRead, result); 645 result.setContext(readTask); 646 647 // schedule timeout 648 if (timeout > 0L) { 649 Future<?> timeoutTask = iocp.schedule(new Runnable() { 650 public void run() { 651 readTask.timeout(); 652 } 653 }, timeout, unit); 654 result.setTimeoutTask(timeoutTask); 655 } 656 657 // initiate I/O 658 readTask.run(); 659 return result; 660 } 661 662 /** 663 * Implements the task to initiate a write and the handler to consume the 664 * result when the write completes. 665 */ 666 private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler { 667 private final ByteBuffer[] bufs; 668 private final int numBufs; 669 private final boolean gatheringWrite; 670 private final PendingFuture<V,A> result; 671 672 // set by run method 673 private ByteBuffer[] shadow; 674 WriteTask(ByteBuffer[] bufs, boolean gatheringWrite, PendingFuture<V,A> result)675 WriteTask(ByteBuffer[] bufs, 676 boolean gatheringWrite, 677 PendingFuture<V,A> result) 678 { 679 this.bufs = bufs; 680 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; 681 this.gatheringWrite = gatheringWrite; 682 this.result = result; 683 } 684 685 /** 686 * Invoked prior to write to prepare the WSABUF array. Where necessary, 687 * it substitutes non-direct buffers with direct buffers. 688 */ prepareBuffers()689 void prepareBuffers() { 690 shadow = new ByteBuffer[numBufs]; 691 long address = writeBufferArray; 692 for (int i=0; i<numBufs; i++) { 693 ByteBuffer src = bufs[i]; 694 int pos = src.position(); 695 int lim = src.limit(); 696 assert (pos <= lim); 697 int rem = (pos <= lim ? lim - pos : 0); 698 long a; 699 if (!(src instanceof DirectBuffer)) { 700 // substitute with direct buffer 701 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 702 bb.put(src); 703 bb.flip(); 704 src.position(pos); // leave heap buffer untouched for now 705 shadow[i] = bb; 706 a = ((DirectBuffer)bb).address(); 707 } else { 708 shadow[i] = src; 709 a = ((DirectBuffer)src).address() + pos; 710 } 711 unsafe.putAddress(address + OFFSETOF_BUF, a); 712 unsafe.putInt(address + OFFSETOF_LEN, rem); 713 address += SIZEOF_WSABUF; 714 } 715 } 716 717 /** 718 * Invoked after a write has completed to update the buffer positions 719 * and release any substituted buffers. 720 */ updateBuffers(int bytesWritten)721 void updateBuffers(int bytesWritten) { 722 // Notify the buffers how many bytes were taken 723 for (int i=0; i<numBufs; i++) { 724 ByteBuffer nextBuffer = bufs[i]; 725 int pos = nextBuffer.position(); 726 int lim = nextBuffer.limit(); 727 int len = (pos <= lim ? lim - pos : lim); 728 if (bytesWritten >= len) { 729 bytesWritten -= len; 730 int newPosition = pos + len; 731 try { 732 nextBuffer.position(newPosition); 733 } catch (IllegalArgumentException x) { 734 // position changed by someone else 735 } 736 } else { // Buffers not completely filled 737 if (bytesWritten > 0) { 738 assert(pos + bytesWritten < (long)Integer.MAX_VALUE); 739 int newPosition = pos + bytesWritten; 740 try { 741 nextBuffer.position(newPosition); 742 } catch (IllegalArgumentException x) { 743 // position changed by someone else 744 } 745 } 746 break; 747 } 748 } 749 } 750 releaseBuffers()751 void releaseBuffers() { 752 for (int i=0; i<numBufs; i++) { 753 if (!(bufs[i] instanceof DirectBuffer)) { 754 Util.releaseTemporaryDirectBuffer(shadow[i]); 755 } 756 } 757 } 758 759 @Override 760 //@SuppressWarnings("unchecked") run()761 public void run() { 762 long overlapped = 0L; 763 boolean prepared = false; 764 boolean pending = false; 765 boolean shutdown = false; 766 767 try { 768 begin(); 769 770 // substitute non-direct buffers 771 prepareBuffers(); 772 prepared = true; 773 774 // get an OVERLAPPED structure (from the cache or allocate) 775 overlapped = ioCache.add(result); 776 int n = write0(handle, numBufs, writeBufferArray, overlapped); 777 if (n == IOStatus.UNAVAILABLE) { 778 // I/O is pending 779 pending = true; 780 return; 781 } 782 if (n == IOStatus.EOF) { 783 // special case for shutdown output 784 shutdown = true; 785 throw new ClosedChannelException(); 786 } 787 // write completed immediately 788 throw new InternalError("Write completed immediately"); 789 } catch (Throwable x) { 790 // write failed. Enable writing before releasing waiters. 791 enableWriting(); 792 if (!shutdown && (x instanceof ClosedChannelException)) 793 x = new AsynchronousCloseException(); 794 if (!(x instanceof IOException)) 795 x = new IOException(x); 796 result.setFailure(x); 797 } finally { 798 // release resources if I/O not pending 799 if (!pending) { 800 if (overlapped != 0L) 801 ioCache.remove(overlapped); 802 if (prepared) 803 releaseBuffers(); 804 } 805 end(); 806 } 807 808 // invoke completion handler 809 Invoker.invoke(result); 810 } 811 812 /** 813 * Executed when the I/O has completed 814 */ 815 @Override 816 @SuppressWarnings("unchecked") completed(int bytesTransferred, boolean canInvokeDirect)817 public void completed(int bytesTransferred, boolean canInvokeDirect) { 818 updateBuffers(bytesTransferred); 819 820 // return direct buffer to cache if substituted 821 releaseBuffers(); 822 823 // release waiters if not already released by timeout 824 synchronized (result) { 825 if (result.isDone()) 826 return; 827 enableWriting(); 828 if (gatheringWrite) { 829 result.setResult((V)Long.valueOf(bytesTransferred)); 830 } else { 831 result.setResult((V)Integer.valueOf(bytesTransferred)); 832 } 833 } 834 if (canInvokeDirect) { 835 Invoker.invokeUnchecked(result); 836 } else { 837 Invoker.invoke(result); 838 } 839 } 840 841 @Override failed(int error, IOException x)842 public void failed(int error, IOException x) { 843 // return direct buffer to cache if substituted 844 releaseBuffers(); 845 846 // release waiters if not already released by timeout 847 if (!isOpen()) 848 x = new AsynchronousCloseException(); 849 850 synchronized (result) { 851 if (result.isDone()) 852 return; 853 enableWriting(); 854 result.setFailure(x); 855 } 856 Invoker.invoke(result); 857 } 858 859 /** 860 * Invoked if timeout expires before it is cancelled 861 */ timeout()862 void timeout() { 863 // synchronize on result as the I/O could complete/fail 864 synchronized (result) { 865 if (result.isDone()) 866 return; 867 868 // kill further writing before releasing waiters 869 enableWriting(true); 870 result.setFailure(new InterruptedByTimeoutException()); 871 } 872 873 // invoke handler without any locks 874 Invoker.invoke(result); 875 } 876 } 877 878 @Override implWrite(boolean gatheringWrite, ByteBuffer src, ByteBuffer[] srcs, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)879 <V extends Number,A> Future<V> implWrite(boolean gatheringWrite, 880 ByteBuffer src, 881 ByteBuffer[] srcs, 882 long timeout, 883 TimeUnit unit, 884 A attachment, 885 CompletionHandler<V,? super A> handler) 886 { 887 // setup task 888 PendingFuture<V,A> result = 889 new PendingFuture<V,A>(this, handler, attachment); 890 ByteBuffer[] bufs; 891 if (gatheringWrite) { 892 bufs = srcs; 893 } else { 894 bufs = new ByteBuffer[1]; 895 bufs[0] = src; 896 } 897 final WriteTask<V,A> writeTask = 898 new WriteTask<V,A>(bufs, gatheringWrite, result); 899 result.setContext(writeTask); 900 901 // schedule timeout 902 if (timeout > 0L) { 903 Future<?> timeoutTask = iocp.schedule(new Runnable() { 904 public void run() { 905 writeTask.timeout(); 906 } 907 }, timeout, unit); 908 result.setTimeoutTask(timeoutTask); 909 } 910 911 // initiate I/O 912 writeTask.run(); 913 return result; 914 } 915 916 // -- Native methods -- 917 initIDs()918 private static native void initIDs(); 919 connect0(long socket, boolean preferIPv6, InetAddress remote, int remotePort, long overlapped)920 private static native int connect0(long socket, boolean preferIPv6, 921 InetAddress remote, int remotePort, long overlapped) throws IOException; 922 updateConnectContext(long socket)923 private static native void updateConnectContext(long socket) throws IOException; 924 read0(long socket, int count, long addres, long overlapped)925 private static native int read0(long socket, int count, long addres, long overlapped) 926 throws IOException; 927 write0(long socket, int count, long address, long overlapped)928 private static native int write0(long socket, int count, long address, 929 long overlapped) throws IOException; 930 shutdown0(long socket, int how)931 private static native void shutdown0(long socket, int how) throws IOException; 932 closesocket0(long socket)933 private static native void closesocket0(long socket) throws IOException; 934 935 static { IOUtil.load()936 IOUtil.load(); initIDs()937 initIDs(); 938 } 939 } 940