1 /* 2 * Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.ByteBuffer; 30 import java.net.*; 31 import java.util.concurrent.*; 32 import java.io.IOException; 33 import java.io.FileDescriptor; 34 35 import sun.net.ConnectionResetException; 36 import sun.net.NetHooks; 37 import sun.net.util.SocketExceptions; 38 import sun.security.action.GetPropertyAction; 39 40 /** 41 * Unix implementation of AsynchronousSocketChannel 42 */ 43 44 class UnixAsynchronousSocketChannelImpl 45 extends AsynchronousSocketChannelImpl implements Port.PollableChannel 46 { 47 private static final NativeDispatcher nd = new SocketDispatcher(); 48 private static enum OpType { CONNECT, READ, WRITE }; 49 50 private static final boolean disableSynchronousRead; 51 static { 52 String propValue = GetPropertyAction.privilegedGetProperty( 53 "sun.nio.ch.disableSynchronousRead", "false"); 54 disableSynchronousRead = propValue.isEmpty() ? 55 true : Boolean.parseBoolean(propValue); 56 } 57 58 private final Port port; 59 private final int fdVal; 60 61 // used to ensure that the context for I/O operations that complete 62 // ascynrhonously is visible to the pooled threads handling I/O events. 63 private final Object updateLock = new Object(); 64 65 // pending connect (updateLock) 66 private boolean connectPending; 67 private CompletionHandler<Void,Object> connectHandler; 68 private Object connectAttachment; 69 private PendingFuture<Void,Object> connectFuture; 70 71 // pending remote address (stateLock) 72 private SocketAddress pendingRemote; 73 74 // pending read (updateLock) 75 private boolean readPending; 76 private boolean isScatteringRead; 77 private ByteBuffer readBuffer; 78 private ByteBuffer[] readBuffers; 79 private CompletionHandler<Number,Object> readHandler; 80 private Object readAttachment; 81 private PendingFuture<Number,Object> readFuture; 82 private Future<?> readTimer; 83 84 // pending write (updateLock) 85 private boolean writePending; 86 private boolean isGatheringWrite; 87 private ByteBuffer writeBuffer; 88 private ByteBuffer[] writeBuffers; 89 private CompletionHandler<Number,Object> writeHandler; 90 private Object writeAttachment; 91 private PendingFuture<Number,Object> writeFuture; 92 private Future<?> writeTimer; 93 94 UnixAsynchronousSocketChannelImpl(Port port)95 UnixAsynchronousSocketChannelImpl(Port port) 96 throws IOException 97 { 98 super(port); 99 100 // set non-blocking 101 try { 102 IOUtil.configureBlocking(fd, false); 103 } catch (IOException x) { 104 nd.close(fd); 105 throw x; 106 } 107 108 this.port = port; 109 this.fdVal = IOUtil.fdVal(fd); 110 111 // add mapping from file descriptor to this channel 112 port.register(fdVal, this); 113 } 114 115 // Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl UnixAsynchronousSocketChannelImpl(Port port, FileDescriptor fd, InetSocketAddress remote)116 UnixAsynchronousSocketChannelImpl(Port port, 117 FileDescriptor fd, 118 InetSocketAddress remote) 119 throws IOException 120 { 121 super(port, fd, remote); 122 123 this.fdVal = IOUtil.fdVal(fd); 124 IOUtil.configureBlocking(fd, false); 125 126 try { 127 port.register(fdVal, this); 128 } catch (ShutdownChannelGroupException x) { 129 // ShutdownChannelGroupException thrown if we attempt to register a 130 // new channel after the group is shutdown 131 throw new IOException(x); 132 } 133 134 this.port = port; 135 } 136 137 @Override group()138 public AsynchronousChannelGroupImpl group() { 139 return port; 140 } 141 142 // register events for outstanding I/O operations, caller already owns updateLock updateEvents()143 private void updateEvents() { 144 assert Thread.holdsLock(updateLock); 145 int events = 0; 146 if (readPending) 147 events |= Net.POLLIN; 148 if (connectPending || writePending) 149 events |= Net.POLLOUT; 150 if (events != 0) 151 port.startPoll(fdVal, events); 152 } 153 154 // register events for outstanding I/O operations lockAndUpdateEvents()155 private void lockAndUpdateEvents() { 156 synchronized (updateLock) { 157 updateEvents(); 158 } 159 } 160 161 // invoke to finish read and/or write operations finish(boolean mayInvokeDirect, boolean readable, boolean writable)162 private void finish(boolean mayInvokeDirect, 163 boolean readable, 164 boolean writable) 165 { 166 boolean finishRead = false; 167 boolean finishWrite = false; 168 boolean finishConnect = false; 169 170 // map event to pending result 171 synchronized (updateLock) { 172 if (readable && this.readPending) { 173 this.readPending = false; 174 finishRead = true; 175 } 176 if (writable) { 177 if (this.writePending) { 178 this.writePending = false; 179 finishWrite = true; 180 } else if (this.connectPending) { 181 this.connectPending = false; 182 finishConnect = true; 183 } 184 } 185 } 186 187 // complete the I/O operation. Special case for when channel is 188 // ready for both reading and writing. In that case, submit task to 189 // complete write if write operation has a completion handler. 190 if (finishRead) { 191 if (finishWrite) 192 finishWrite(false); 193 finishRead(mayInvokeDirect); 194 return; 195 } 196 if (finishWrite) { 197 finishWrite(mayInvokeDirect); 198 } 199 if (finishConnect) { 200 finishConnect(mayInvokeDirect); 201 } 202 } 203 204 /** 205 * Invoked by event handler thread when file descriptor is polled 206 */ 207 @Override onEvent(int events, boolean mayInvokeDirect)208 public void onEvent(int events, boolean mayInvokeDirect) { 209 boolean readable = (events & Net.POLLIN) > 0; 210 boolean writable = (events & Net.POLLOUT) > 0; 211 if ((events & (Net.POLLERR | Net.POLLHUP)) > 0) { 212 readable = true; 213 writable = true; 214 } 215 finish(mayInvokeDirect, readable, writable); 216 } 217 218 @Override implClose()219 void implClose() throws IOException { 220 // remove the mapping 221 port.unregister(fdVal); 222 223 // close file descriptor 224 nd.close(fd); 225 226 // All outstanding I/O operations are required to fail 227 finish(false, true, true); 228 } 229 230 @Override onCancel(PendingFuture<?,?> task)231 public void onCancel(PendingFuture<?,?> task) { 232 if (task.getContext() == OpType.CONNECT) 233 killConnect(); 234 if (task.getContext() == OpType.READ) 235 killReading(); 236 if (task.getContext() == OpType.WRITE) 237 killWriting(); 238 } 239 240 // -- connect -- 241 setConnected()242 private void setConnected() throws IOException { 243 synchronized (stateLock) { 244 state = ST_CONNECTED; 245 localAddress = Net.localAddress(fd); 246 remoteAddress = (InetSocketAddress)pendingRemote; 247 } 248 } 249 finishConnect(boolean mayInvokeDirect)250 private void finishConnect(boolean mayInvokeDirect) { 251 Throwable e = null; 252 try { 253 begin(); 254 checkConnect(fdVal); 255 setConnected(); 256 } catch (Throwable x) { 257 if (x instanceof ClosedChannelException) 258 x = new AsynchronousCloseException(); 259 e = x; 260 } finally { 261 end(); 262 } 263 if (e != null) { 264 if (e instanceof IOException) { 265 var isa = (InetSocketAddress)pendingRemote; 266 e = SocketExceptions.of((IOException)e, isa); 267 } 268 // close channel if connection cannot be established 269 try { 270 close(); 271 } catch (Throwable suppressed) { 272 e.addSuppressed(suppressed); 273 } 274 } 275 276 // invoke handler and set result 277 CompletionHandler<Void,Object> handler = connectHandler; 278 connectHandler = null; 279 Object att = connectAttachment; 280 PendingFuture<Void,Object> future = connectFuture; 281 if (handler == null) { 282 future.setResult(null, e); 283 } else { 284 if (mayInvokeDirect) { 285 Invoker.invokeUnchecked(handler, att, null, e); 286 } else { 287 Invoker.invokeIndirectly(this, handler, att, null, e); 288 } 289 } 290 } 291 292 @Override 293 @SuppressWarnings("unchecked") implConnect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler)294 <A> Future<Void> implConnect(SocketAddress remote, 295 A attachment, 296 CompletionHandler<Void,? super A> handler) 297 { 298 if (!isOpen()) { 299 Throwable e = new ClosedChannelException(); 300 if (handler == null) { 301 return CompletedFuture.withFailure(e); 302 } else { 303 Invoker.invoke(this, handler, attachment, null, e); 304 return null; 305 } 306 } 307 308 InetSocketAddress isa = Net.checkAddress(remote); 309 310 // permission check 311 SecurityManager sm = System.getSecurityManager(); 312 if (sm != null) 313 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 314 315 // check and set state 316 boolean notifyBeforeTcpConnect; 317 synchronized (stateLock) { 318 if (state == ST_CONNECTED) 319 throw new AlreadyConnectedException(); 320 if (state == ST_PENDING) 321 throw new ConnectionPendingException(); 322 state = ST_PENDING; 323 pendingRemote = remote; 324 notifyBeforeTcpConnect = (localAddress == null); 325 } 326 327 Throwable e = null; 328 try { 329 begin(); 330 // notify hook if unbound 331 if (notifyBeforeTcpConnect) 332 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); 333 int n = Net.connect(fd, isa.getAddress(), isa.getPort()); 334 if (n == IOStatus.UNAVAILABLE) { 335 // connection could not be established immediately 336 PendingFuture<Void,A> result = null; 337 synchronized (updateLock) { 338 if (handler == null) { 339 result = new PendingFuture<Void,A>(this, OpType.CONNECT); 340 this.connectFuture = (PendingFuture<Void,Object>)result; 341 } else { 342 this.connectHandler = (CompletionHandler<Void,Object>)handler; 343 this.connectAttachment = attachment; 344 } 345 this.connectPending = true; 346 updateEvents(); 347 } 348 return result; 349 } 350 setConnected(); 351 } catch (Throwable x) { 352 if (x instanceof ClosedChannelException) 353 x = new AsynchronousCloseException(); 354 e = x; 355 } finally { 356 end(); 357 } 358 359 // close channel if connect fails 360 if (e != null) { 361 if (e instanceof IOException) { 362 e = SocketExceptions.of((IOException)e, isa); 363 } 364 try { 365 close(); 366 } catch (Throwable suppressed) { 367 e.addSuppressed(suppressed); 368 } 369 } 370 if (handler == null) { 371 return CompletedFuture.withResult(null, e); 372 } else { 373 Invoker.invoke(this, handler, attachment, null, e); 374 return null; 375 } 376 } 377 378 // -- read -- 379 finishRead(boolean mayInvokeDirect)380 private void finishRead(boolean mayInvokeDirect) { 381 int n = -1; 382 Throwable exc = null; 383 384 // copy fields as we can't access them after reading is re-enabled. 385 boolean scattering = isScatteringRead; 386 CompletionHandler<Number,Object> handler = readHandler; 387 Object att = readAttachment; 388 PendingFuture<Number,Object> future = readFuture; 389 Future<?> timeout = readTimer; 390 391 try { 392 begin(); 393 394 if (scattering) { 395 n = (int)IOUtil.read(fd, readBuffers, nd); 396 } else { 397 n = IOUtil.read(fd, readBuffer, -1, nd); 398 } 399 if (n == IOStatus.UNAVAILABLE) { 400 // spurious wakeup, is this possible? 401 synchronized (updateLock) { 402 readPending = true; 403 } 404 return; 405 } 406 407 // allow objects to be GC'ed. 408 this.readBuffer = null; 409 this.readBuffers = null; 410 this.readAttachment = null; 411 this.readHandler = null; 412 413 // allow another read to be initiated 414 enableReading(); 415 416 } catch (Throwable x) { 417 enableReading(); 418 if (x instanceof ClosedChannelException) 419 x = new AsynchronousCloseException(); 420 if (x instanceof ConnectionResetException) 421 x = new IOException(x.getMessage()); 422 exc = x; 423 } finally { 424 // restart poll in case of concurrent write 425 if (!(exc instanceof AsynchronousCloseException)) 426 lockAndUpdateEvents(); 427 end(); 428 } 429 430 // cancel the associated timer 431 if (timeout != null) 432 timeout.cancel(false); 433 434 // create result 435 Number result = (exc != null) ? null : (scattering) ? 436 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 437 438 // invoke handler or set result 439 if (handler == null) { 440 future.setResult(result, exc); 441 } else { 442 if (mayInvokeDirect) { 443 Invoker.invokeUnchecked(handler, att, result, exc); 444 } else { 445 Invoker.invokeIndirectly(this, handler, att, result, exc); 446 } 447 } 448 } 449 450 private Runnable readTimeoutTask = new Runnable() { 451 public void run() { 452 CompletionHandler<Number,Object> handler = null; 453 Object att = null; 454 PendingFuture<Number,Object> future = null; 455 456 synchronized (updateLock) { 457 if (!readPending) 458 return; 459 readPending = false; 460 handler = readHandler; 461 att = readAttachment; 462 future = readFuture; 463 } 464 465 // kill further reading before releasing waiters 466 enableReading(true); 467 468 // invoke handler or set result 469 Exception exc = new InterruptedByTimeoutException(); 470 if (handler == null) { 471 future.setFailure(exc); 472 } else { 473 AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this; 474 Invoker.invokeIndirectly(ch, handler, att, null, exc); 475 } 476 } 477 }; 478 479 /** 480 * Initiates a read or scattering read operation 481 */ 482 @Override 483 @SuppressWarnings("unchecked") implRead(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)484 <V extends Number,A> Future<V> implRead(boolean isScatteringRead, 485 ByteBuffer dst, 486 ByteBuffer[] dsts, 487 long timeout, 488 TimeUnit unit, 489 A attachment, 490 CompletionHandler<V,? super A> handler) 491 { 492 // A synchronous read is not attempted if disallowed by system property 493 // or, we are using a fixed thread pool and the completion handler may 494 // not be invoked directly (because the thread is not a pooled thread or 495 // there are too many handlers on the stack). 496 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null; 497 boolean invokeDirect = false; 498 boolean attemptRead = false; 499 if (!disableSynchronousRead) { 500 if (handler == null) { 501 attemptRead = true; 502 } else { 503 myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount(); 504 invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); 505 // okay to attempt read with user thread pool 506 attemptRead = invokeDirect || !port.isFixedThreadPool(); 507 } 508 } 509 510 int n = IOStatus.UNAVAILABLE; 511 Throwable exc = null; 512 boolean pending = false; 513 514 try { 515 begin(); 516 517 if (attemptRead) { 518 if (isScatteringRead) { 519 n = (int)IOUtil.read(fd, dsts, nd); 520 } else { 521 n = IOUtil.read(fd, dst, -1, nd); 522 } 523 } 524 525 if (n == IOStatus.UNAVAILABLE) { 526 PendingFuture<V,A> result = null; 527 synchronized (updateLock) { 528 this.isScatteringRead = isScatteringRead; 529 this.readBuffer = dst; 530 this.readBuffers = dsts; 531 if (handler == null) { 532 this.readHandler = null; 533 result = new PendingFuture<V,A>(this, OpType.READ); 534 this.readFuture = (PendingFuture<Number,Object>)result; 535 this.readAttachment = null; 536 } else { 537 this.readHandler = (CompletionHandler<Number,Object>)handler; 538 this.readAttachment = attachment; 539 this.readFuture = null; 540 } 541 if (timeout > 0L) { 542 this.readTimer = port.schedule(readTimeoutTask, timeout, unit); 543 } 544 this.readPending = true; 545 updateEvents(); 546 } 547 pending = true; 548 return result; 549 } 550 } catch (Throwable x) { 551 if (x instanceof ClosedChannelException) 552 x = new AsynchronousCloseException(); 553 if (x instanceof ConnectionResetException) 554 x = new IOException(x.getMessage()); 555 exc = x; 556 } finally { 557 if (!pending) 558 enableReading(); 559 end(); 560 } 561 562 Number result = (exc != null) ? null : (isScatteringRead) ? 563 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 564 565 // read completed immediately 566 if (handler != null) { 567 if (invokeDirect) { 568 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); 569 } else { 570 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); 571 } 572 return null; 573 } else { 574 return CompletedFuture.withResult((V)result, exc); 575 } 576 } 577 578 // -- write -- 579 finishWrite(boolean mayInvokeDirect)580 private void finishWrite(boolean mayInvokeDirect) { 581 int n = -1; 582 Throwable exc = null; 583 584 // copy fields as we can't access them after reading is re-enabled. 585 boolean gathering = this.isGatheringWrite; 586 CompletionHandler<Number,Object> handler = this.writeHandler; 587 Object att = this.writeAttachment; 588 PendingFuture<Number,Object> future = this.writeFuture; 589 Future<?> timer = this.writeTimer; 590 591 try { 592 begin(); 593 594 if (gathering) { 595 n = (int)IOUtil.write(fd, writeBuffers, nd); 596 } else { 597 n = IOUtil.write(fd, writeBuffer, -1, nd); 598 } 599 if (n == IOStatus.UNAVAILABLE) { 600 // spurious wakeup, is this possible? 601 synchronized (updateLock) { 602 writePending = true; 603 } 604 return; 605 } 606 607 // allow objects to be GC'ed. 608 this.writeBuffer = null; 609 this.writeBuffers = null; 610 this.writeAttachment = null; 611 this.writeHandler = null; 612 613 // allow another write to be initiated 614 enableWriting(); 615 616 } catch (Throwable x) { 617 enableWriting(); 618 if (x instanceof ClosedChannelException) 619 x = new AsynchronousCloseException(); 620 exc = x; 621 } finally { 622 // restart poll in case of concurrent write 623 if (!(exc instanceof AsynchronousCloseException)) 624 lockAndUpdateEvents(); 625 end(); 626 } 627 628 // cancel the associated timer 629 if (timer != null) 630 timer.cancel(false); 631 632 // create result 633 Number result = (exc != null) ? null : (gathering) ? 634 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 635 636 // invoke handler or set result 637 if (handler == null) { 638 future.setResult(result, exc); 639 } else { 640 if (mayInvokeDirect) { 641 Invoker.invokeUnchecked(handler, att, result, exc); 642 } else { 643 Invoker.invokeIndirectly(this, handler, att, result, exc); 644 } 645 } 646 } 647 648 private Runnable writeTimeoutTask = new Runnable() { 649 public void run() { 650 CompletionHandler<Number,Object> handler = null; 651 Object att = null; 652 PendingFuture<Number,Object> future = null; 653 654 synchronized (updateLock) { 655 if (!writePending) 656 return; 657 writePending = false; 658 handler = writeHandler; 659 att = writeAttachment; 660 future = writeFuture; 661 } 662 663 // kill further writing before releasing waiters 664 enableWriting(true); 665 666 // invoke handler or set result 667 Exception exc = new InterruptedByTimeoutException(); 668 if (handler != null) { 669 Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this, 670 handler, att, null, exc); 671 } else { 672 future.setFailure(exc); 673 } 674 } 675 }; 676 677 /** 678 * Initiates a read or scattering read operation 679 */ 680 @Override 681 @SuppressWarnings("unchecked") implWrite(boolean isGatheringWrite, ByteBuffer src, ByteBuffer[] srcs, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)682 <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite, 683 ByteBuffer src, 684 ByteBuffer[] srcs, 685 long timeout, 686 TimeUnit unit, 687 A attachment, 688 CompletionHandler<V,? super A> handler) 689 { 690 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 691 Invoker.getGroupAndInvokeCount(); 692 boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); 693 boolean attemptWrite = (handler == null) || invokeDirect || 694 !port.isFixedThreadPool(); // okay to attempt write with user thread pool 695 696 int n = IOStatus.UNAVAILABLE; 697 Throwable exc = null; 698 boolean pending = false; 699 700 try { 701 begin(); 702 703 if (attemptWrite) { 704 if (isGatheringWrite) { 705 n = (int)IOUtil.write(fd, srcs, nd); 706 } else { 707 n = IOUtil.write(fd, src, -1, nd); 708 } 709 } 710 711 if (n == IOStatus.UNAVAILABLE) { 712 PendingFuture<V,A> result = null; 713 synchronized (updateLock) { 714 this.isGatheringWrite = isGatheringWrite; 715 this.writeBuffer = src; 716 this.writeBuffers = srcs; 717 if (handler == null) { 718 this.writeHandler = null; 719 result = new PendingFuture<V,A>(this, OpType.WRITE); 720 this.writeFuture = (PendingFuture<Number,Object>)result; 721 this.writeAttachment = null; 722 } else { 723 this.writeHandler = (CompletionHandler<Number,Object>)handler; 724 this.writeAttachment = attachment; 725 this.writeFuture = null; 726 } 727 if (timeout > 0L) { 728 this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit); 729 } 730 this.writePending = true; 731 updateEvents(); 732 } 733 pending = true; 734 return result; 735 } 736 } catch (Throwable x) { 737 if (x instanceof ClosedChannelException) 738 x = new AsynchronousCloseException(); 739 exc = x; 740 } finally { 741 if (!pending) 742 enableWriting(); 743 end(); 744 } 745 746 Number result = (exc != null) ? null : (isGatheringWrite) ? 747 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 748 749 // write completed immediately 750 if (handler != null) { 751 if (invokeDirect) { 752 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); 753 } else { 754 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); 755 } 756 return null; 757 } else { 758 return CompletedFuture.withResult((V)result, exc); 759 } 760 } 761 762 // -- Native methods -- 763 checkConnect(int fdVal)764 private static native void checkConnect(int fdVal) throws IOException; 765 766 static { IOUtil.load()767 IOUtil.load(); 768 } 769 } 770