1 /* 2 * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.nio.ByteBuffer; 31 import java.nio.channels.AsynchronousCloseException; 32 import java.nio.channels.ClosedChannelException; 33 import java.nio.channels.NotYetConnectedException; 34 import java.nio.channels.Pipe; 35 import java.nio.channels.SelectionKey; 36 import java.nio.channels.spi.SelectorProvider; 37 import java.util.Objects; 38 import java.util.concurrent.locks.ReentrantLock; 39 40 class SinkChannelImpl 41 extends Pipe.SinkChannel 42 implements SelChImpl 43 { 44 // Used to make native read and write calls 45 private static final NativeDispatcher nd = new FileDispatcherImpl(); 46 47 // The file descriptor associated with this channel 48 private final FileDescriptor fd; 49 private final int fdVal; 50 51 // Lock held by current writing thread 52 private final ReentrantLock writeLock = new ReentrantLock(); 53 54 // Lock held by any thread that modifies the state fields declared below 55 // DO NOT invoke a blocking I/O operation while holding this lock! 56 private final Object stateLock = new Object(); 57 58 // -- The following fields are protected by stateLock 59 60 // Channel state 61 private static final int ST_INUSE = 0; 62 private static final int ST_CLOSING = 1; 63 private static final int ST_CLOSED = 2; 64 private int state; 65 66 // ID of native thread doing write, for signalling 67 private long thread; 68 69 // -- End of fields protected by stateLock 70 71 getFD()72 public FileDescriptor getFD() { 73 return fd; 74 } 75 getFDVal()76 public int getFDVal() { 77 return fdVal; 78 } 79 SinkChannelImpl(SelectorProvider sp, FileDescriptor fd)80 SinkChannelImpl(SelectorProvider sp, FileDescriptor fd) { 81 super(sp); 82 this.fd = fd; 83 this.fdVal = IOUtil.fdVal(fd); 84 } 85 86 /** 87 * Closes the write end of the pipe if there are no write operation in 88 * progress and the channel is not registered with a Selector. 89 */ tryClose()90 private boolean tryClose() throws IOException { 91 assert Thread.holdsLock(stateLock) && state == ST_CLOSING; 92 if (thread == 0 && !isRegistered()) { 93 state = ST_CLOSED; 94 nd.close(fd); 95 return true; 96 } else { 97 return false; 98 } 99 } 100 101 /** 102 * Invokes tryClose to attempt to close the write end of the pipe. 103 * 104 * This method is used for deferred closing by I/O and Selector operations. 105 */ tryFinishClose()106 private void tryFinishClose() { 107 try { 108 tryClose(); 109 } catch (IOException ignore) { } 110 } 111 112 /** 113 * Closes this channel when configured in blocking mode. 114 * 115 * If there is a write operation in progress then the write-end of the pipe 116 * is pre-closed and the writer is signalled, in which case the final close 117 * is deferred until the writer aborts. 118 */ implCloseBlockingMode()119 private void implCloseBlockingMode() throws IOException { 120 synchronized (stateLock) { 121 assert state < ST_CLOSING; 122 state = ST_CLOSING; 123 if (!tryClose()) { 124 long th = thread; 125 if (th != 0) { 126 nd.preClose(fd); 127 NativeThread.signal(th); 128 } 129 } 130 } 131 } 132 133 /** 134 * Closes this channel when configured in non-blocking mode. 135 * 136 * If the channel is registered with a Selector then the close is deferred 137 * until the channel is flushed from all Selectors. 138 */ 139 private void implCloseNonBlockingMode() throws IOException { 140 synchronized (stateLock) { 141 assert state < ST_CLOSING; 142 state = ST_CLOSING; 143 } 144 // wait for any write operation to complete before trying to close 145 writeLock.lock(); 146 writeLock.unlock(); 147 synchronized (stateLock) { 148 if (state == ST_CLOSING) { 149 tryClose(); 150 } 151 } 152 } 153 154 /** 155 * Invoked by implCloseChannel to close the channel. 156 */ 157 @Override 158 protected void implCloseSelectableChannel() throws IOException { 159 assert !isOpen(); 160 if (isBlocking()) { 161 implCloseBlockingMode(); 162 } else { 163 implCloseNonBlockingMode(); 164 } 165 } 166 167 @Override 168 public void kill() { 169 synchronized (stateLock) { 170 if (state == ST_CLOSING) { 171 tryFinishClose(); 172 } 173 } 174 } 175 176 @Override 177 protected void implConfigureBlocking(boolean block) throws IOException { 178 writeLock.lock(); 179 try { 180 synchronized (stateLock) { 181 if (!isOpen()) 182 throw new ClosedChannelException(); 183 IOUtil.configureBlocking(fd, block); 184 } 185 } finally { 186 writeLock.unlock(); 187 } 188 } 189 190 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 191 int intOps = ski.nioInterestOps(); 192 int oldOps = ski.nioReadyOps(); 193 int newOps = initialOps; 194 195 if ((ops & Net.POLLNVAL) != 0) 196 throw new Error("POLLNVAL detected"); 197 198 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 199 newOps = intOps; 200 ski.nioReadyOps(newOps); 201 return (newOps & ~oldOps) != 0; 202 } 203 204 if (((ops & Net.POLLOUT) != 0) && 205 ((intOps & SelectionKey.OP_WRITE) != 0)) 206 newOps |= SelectionKey.OP_WRITE; 207 208 ski.nioReadyOps(newOps); 209 return (newOps & ~oldOps) != 0; 210 } 211 212 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 213 return translateReadyOps(ops, ski.nioReadyOps(), ski); 214 } 215 216 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 217 return translateReadyOps(ops, 0, ski); 218 } 219 220 public int translateInterestOps(int ops) { 221 int newOps = 0; 222 if (ops == SelectionKey.OP_WRITE) 223 newOps |= Net.POLLOUT; 224 return newOps; 225 } 226 227 /** 228 * Marks the beginning of a write operation that might block. 229 * 230 * @throws ClosedChannelException if the channel is closed 231 * @throws NotYetConnectedException if the channel is not yet connected 232 */ 233 private void beginWrite(boolean blocking) throws ClosedChannelException { 234 if (blocking) { 235 // set hook for Thread.interrupt 236 begin(); 237 } 238 synchronized (stateLock) { 239 if (!isOpen()) 240 throw new ClosedChannelException(); 241 if (blocking) 242 thread = NativeThread.current(); 243 } 244 } 245 246 /** 247 * Marks the end of a write operation that may have blocked. 248 * 249 * @throws AsynchronousCloseException if the channel was closed due to this 250 * thread being interrupted on a blocking write operation. 251 */ 252 private void endWrite(boolean blocking, boolean completed) 253 throws AsynchronousCloseException 254 { 255 if (blocking) { 256 synchronized (stateLock) { 257 thread = 0; 258 if (state == ST_CLOSING) { 259 tryFinishClose(); 260 } 261 } 262 // remove hook for Thread.interrupt 263 end(completed); 264 } 265 } 266 267 @Override 268 public int write(ByteBuffer src) throws IOException { 269 Objects.requireNonNull(src); 270 271 writeLock.lock(); 272 try { 273 boolean blocking = isBlocking(); 274 int n = 0; 275 try { 276 beginWrite(blocking); 277 n = IOUtil.write(fd, src, -1, nd); 278 if (blocking) { 279 while (IOStatus.okayToRetry(n) && isOpen()) { 280 park(Net.POLLOUT); 281 n = IOUtil.write(fd, src, -1, nd); 282 } 283 } 284 } finally { 285 endWrite(blocking, n > 0); 286 assert IOStatus.check(n); 287 } 288 return IOStatus.normalize(n); 289 } finally { 290 writeLock.unlock(); 291 } 292 } 293 294 @Override 295 public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { 296 Objects.checkFromIndexSize(offset, length, srcs.length); 297 298 writeLock.lock(); 299 try { 300 boolean blocking = isBlocking(); 301 long n = 0; 302 try { 303 beginWrite(blocking); 304 n = IOUtil.write(fd, srcs, offset, length, nd); 305 if (blocking) { 306 while (IOStatus.okayToRetry(n) && isOpen()) { 307 park(Net.POLLOUT); 308 n = IOUtil.write(fd, srcs, offset, length, nd); 309 } 310 } 311 } finally { 312 endWrite(blocking, n > 0); 313 assert IOStatus.check(n); 314 } 315 return IOStatus.normalize(n); 316 } finally { writeLock.unlock()317 writeLock.unlock(); 318 } 319 } 320 321 @Override write(ByteBuffer[] srcs)322 public long write(ByteBuffer[] srcs) throws IOException { 323 return write(srcs, 0, srcs.length); 324 } 325 } 326