1 /** 2 * The utillib library. 3 * More information is available at http://www.jinchess.com/. 4 * Copyright (C) 2002 Alexander Maryanovsky. 5 * All rights reserved. 6 * 7 * The utillib library is free software; you can redistribute 8 * it and/or modify it under the terms of the GNU Lesser General Public License 9 * as published by the Free Software Foundation; either version 2 of the 10 * License, or (at your option) any later version. 11 * 12 * The utillib library is distributed in the hope that it will 13 * be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 15 * General Public License for more details. 16 * 17 * You should have received a copy of the GNU Lesser General Public License 18 * along with utillib library; if not, write to the Free Software 19 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 20 */ 21 22 package free.util; 23 24 import java.io.IOException; 25 import java.io.InterruptedIOException; 26 27 /** 28 * This class is responsible for creating and managing pairs of PipedStreams. 29 * A single instance of this class consists of one PipedInputStream and one 30 * PipedOutputStream connected to each other. 31 */ 32 33 public class PipedStreams{ 34 35 36 /** 37 * The default buffer size. 38 */ 39 40 private static final int DEFAULT_BUFFER_SIZE = 2048; 41 42 43 44 /** 45 * The PipedInputStream. 46 */ 47 48 private final PipedInputStream in; 49 50 51 52 /** 53 * The PipedOutputStream. 54 */ 55 56 private final PipedOutputStream out; 57 58 59 60 /** 61 * The value of the soTimeout. 62 */ 63 64 private volatile int soTimeout = 0; 65 66 67 68 /** 69 * The buffer. 70 */ 71 72 private byte [] buf; 73 74 75 76 /** 77 * Whether the buffer is allowed to grow. 78 */ 79 80 private final boolean growBuf; 81 82 83 84 85 /** 86 * The index of the byte that will be read next. 87 */ 88 89 private int readIndex = 0; 90 91 92 93 /** 94 * The index of the byte that that will be written next. 95 */ 96 97 private int writeIndex = 0; 98 99 100 101 /** 102 * Becomes true when the PipedOutputStream gets closed. 103 */ 104 105 private boolean writerClosed = false; 106 107 108 109 /** 110 * Becomes true when the PipedInputStream gets closed. 111 */ 112 113 private boolean readerClosed = false; 114 115 116 117 /** 118 * The lock protecting writing. 119 */ 120 121 private Object writeLock = new String("Write Lock for PipedStreams"); 122 123 124 125 /** 126 * The lock protecting reading. 127 */ 128 129 private Object readLock = new String("Read Lock for PipedStream"); 130 131 132 133 /** 134 * Creates new <code>PipedStreams</code>. 135 */ 136 PipedStreams()137 public PipedStreams(){ 138 this(2048, false); 139 } 140 141 142 143 /** 144 * Creates new <code>PipedStreams</code> with the specified buffer size. Once 145 * the specified amount of bytes have been written into the, 146 * <code>OutputStream</code> attempting to write more data will block until 147 * enough data has been read to allow writing into the buffer again. 148 */ 149 PipedStreams(int bufSize)150 public PipedStreams(int bufSize){ 151 this(bufSize, false); 152 } 153 154 155 156 157 /** 158 * Creates new <code>PipedStreams</code>. If <code>growBuf</code> is 159 * <code>true</code>, the internal buffer will be grown indefinitely when more 160 * space is required for the written data. This means that writing into the 161 * <code>OutputStream</code> will never block. Note that there is currently no 162 * mechanism to cause the internal buffer to shrink. 163 */ 164 PipedStreams(boolean growBuf)165 public PipedStreams(boolean growBuf){ 166 this(DEFAULT_BUFFER_SIZE, growBuf); 167 } 168 169 170 171 172 /** 173 * Creates new <code>PipedStreams</code> with the specified initial buffer 174 * size, potentially allowing the buffer to grow indefinitely. 175 */ 176 PipedStreams(int bufSize, boolean growBuf)177 public PipedStreams(int bufSize, boolean growBuf){ 178 if (bufSize <= 0) 179 throw new IllegalArgumentException("The buffer size must be a positive integer"); 180 181 in = new PipedInputStream(this); 182 out = new PipedOutputStream(this); 183 184 this.growBuf = growBuf; 185 this.buf = new byte[bufSize]; 186 } 187 188 189 190 191 /** 192 * Sets the SO_TIMEOUT for the PipedInputStream. A read operation on the 193 * PipedInputStream will only block for the given amount of milliseconds, after 194 * that, an InterruptedIOException will be thrown, but the streams will remain 195 * valid. A value of 0 implies this option is off (read() can block indefinitely). 196 * NOTE: This method should not be called while a read() operation is in progress 197 * (it will block until the read() is done, which may be a long time, or never). 198 */ 199 setSoTimeout(int timeout)200 public void setSoTimeout(int timeout){ 201 synchronized(readLock){ // Don't modify this while a read is in progress. 202 soTimeout = timeout; 203 } 204 } 205 206 207 208 209 /** 210 * Returns the value of SO_TIMEOUT. A value of 0 implies this option is off 211 * (read() can block indefinitely). 212 */ 213 getSoTimeout()214 public int getSoTimeout(){ 215 return soTimeout; 216 } 217 218 219 220 221 222 /** 223 * Returns the PipedInputStream. 224 */ 225 getInputStream()226 public PipedInputStream getInputStream(){ 227 return in; 228 } 229 230 231 /** 232 * Returns the PipedOutputStream. 233 */ 234 getOutputStream()235 public PipedOutputStream getOutputStream(){ 236 return out; 237 } 238 239 240 241 /** 242 * Returns the amount of bytes available to be read immediately (without 243 * blocking) by the PipedInputStream. 244 */ 245 available()246 synchronized int available(){ 247 if (readerClosed) 248 return 0; 249 250 return availableImpl(); 251 } 252 253 254 255 256 /** 257 * Returns the amount of bytes available to be read immediately (without 258 * blocking) by the PipedInputStream. 259 */ 260 availableImpl()261 private int availableImpl(){ 262 if (writeIndex >= readIndex) // On the same lap. 263 return writeIndex - readIndex; 264 else // On different laps. 265 return writeIndex + buf.length - readIndex; 266 } 267 268 269 270 271 /** 272 * Returns the amount of bytes that can be written into the buffer without 273 * blocking. 274 */ 275 availableSpace()276 private int availableSpace(){ 277 return buf.length - availableImpl() - 1; 278 } 279 280 281 282 283 /** 284 * Increases the size of the internal buffer by at least the specified amount 285 * of bytes. The caller must take care of proper synchronization. 286 */ 287 growBuf(int minGrowSize)288 private void growBuf(int minGrowSize){ 289 int growSize = minGrowSize < buf.length ? buf.length : minGrowSize; 290 byte [] newBuf = new byte[buf.length + growSize]; 291 System.arraycopy(buf, 0, newBuf, 0, buf.length); 292 buf = newBuf; 293 } 294 295 296 297 298 /** 299 * Writes a single byte to the buffer. Note that this method can block if the 300 * buffer is full. 301 */ 302 303 synchronized void write(int b) throws IOException{ 304 synchronized(writeLock){ 305 if (readerClosed || writerClosed) 306 throw new IOException("Stream closed"); 307 308 while (availableSpace() == 0){ 309 if (growBuf) 310 growBuf(1); 311 else try{ 312 wait(); 313 } catch (InterruptedException e){ 314 throw new InterruptedIOException(); 315 } 316 } 317 318 if (readerClosed || writerClosed) 319 throw new IOException("Stream closed"); 320 321 buf[writeIndex++] = (byte)(b&0xff); 322 if (writeIndex == buf.length) 323 writeIndex = 0; 324 325 notifyAll(); 326 } 327 } 328 329 330 331 /** 332 * Writes bytes according to the contract of OutputStream.write(byte [], int, int) 333 * with the only difference that this might block if the buffer is full. 334 */ 335 336 synchronized void write(byte [] arr, int offset, int length) throws IOException{ 337 synchronized(writeLock){ 338 if (readerClosed||writerClosed) 339 throw new IOException("Stream closed"); 340 341 if (growBuf && (length > availableSpace())) 342 growBuf(length - availableSpace()); 343 344 while(length > 0){ 345 while (availableSpace() == 0){ 346 try{ 347 wait(); 348 } catch (InterruptedException e){ 349 throw new InterruptedIOException(); 350 } 351 } 352 353 int availableSpace = availableSpace(); 354 355 int amountToWrite = length > availableSpace ? availableSpace : length; 356 int part1Size = buf.length-writeIndex >= amountToWrite ? amountToWrite : buf.length-writeIndex; 357 int part2Size = amountToWrite-part1Size > 0 ? amountToWrite - part1Size : 0; 358 359 System.arraycopy(arr, offset, buf, writeIndex, part1Size); 360 System.arraycopy(arr, offset + part1Size, buf, 0, part2Size); 361 362 offset += amountToWrite; 363 length -= amountToWrite; 364 365 writeIndex = (writeIndex + amountToWrite) % buf.length; 366 367 notifyAll(); 368 } 369 } 370 } 371 372 373 374 /** 375 * Reads a single byte from the buffer according to the contract of 376 * InputStream.read(). 377 */ 378 read()379 synchronized int read() throws IOException{ 380 synchronized(readLock){ 381 if (readerClosed) 382 throw new IOException("Stream closed"); 383 384 final long startedWaitingTS = System.currentTimeMillis(); 385 while (available() == 0){ 386 if (writerClosed) 387 return -1; 388 389 long curTime = System.currentTimeMillis(); 390 if ((soTimeout != 0) && (curTime - startedWaitingTS >= soTimeout)) 391 throw new InterruptedIOException(); 392 393 try{ 394 if (soTimeout == 0) 395 wait(); 396 else{ 397 wait(soTimeout + curTime - startedWaitingTS); 398 } 399 } catch (InterruptedException e){ 400 throw new InterruptedIOException(); 401 } 402 if (readerClosed) 403 throw new IOException("Stream closed"); 404 405 } 406 407 int b = buf[readIndex++]; 408 if (readIndex == buf.length) 409 readIndex = 0; 410 411 notifyAll(); 412 413 return b < 0 ? b+256 : b; 414 } 415 } 416 417 418 419 420 /** 421 * Reads bytes from the buffer into the given array according to the contract 422 * of InputStream.read(byte [], int, int) 423 */ 424 read(byte [] arr, int offset, int length)425 synchronized int read(byte [] arr, int offset, int length) throws IOException{ 426 synchronized(readLock){ 427 if (readerClosed) 428 throw new IOException("Stream closed"); 429 430 final long startedWaitingTS = System.currentTimeMillis(); 431 while (available() == 0){ 432 if (writerClosed) 433 return -1; 434 435 long curTime = System.currentTimeMillis(); 436 if ((soTimeout != 0) && (curTime - startedWaitingTS >= soTimeout)) 437 throw new InterruptedIOException(); 438 439 try{ 440 if (soTimeout == 0) 441 wait(); 442 else{ 443 wait(soTimeout + curTime - startedWaitingTS); 444 } 445 } catch (InterruptedException e){ 446 throw new InterruptedIOException(); 447 } 448 if (readerClosed) 449 throw new IOException("Stream closed"); 450 } 451 452 int available = available(); 453 int amountToRead = length > available ? available : length; 454 int part1Size = buf.length-readIndex > amountToRead ? amountToRead : buf.length-readIndex; 455 int part2Size = amountToRead-part1Size > 0 ? amountToRead-part1Size : 0; 456 457 System.arraycopy(buf, readIndex, arr, offset, part1Size); 458 System.arraycopy(buf, 0, arr, offset + part1Size, part2Size); 459 460 readIndex = (readIndex + amountToRead) % buf.length; 461 462 notifyAll(); 463 464 return amountToRead; 465 } 466 } 467 468 469 470 /** 471 * Closes down the streams as far as the PipedOutputStream is concerned. 472 */ 473 closeWriter()474 synchronized void closeWriter(){ 475 if (writerClosed) 476 throw new IllegalStateException("Already closed"); 477 writerClosed = true; 478 notifyAll(); 479 } 480 481 482 483 /** 484 * Closes down the streams as far as the PipedInputStream is concerned 485 */ 486 closeReader()487 synchronized void closeReader(){ 488 if (readerClosed) 489 throw new IllegalStateException("Already closed"); 490 491 readerClosed = true; 492 notifyAll(); 493 } 494 495 } 496