1 /**
2  * The utillib library.
3  * More information is available at http://www.jinchess.com/.
4  * Copyright (C) 2002 Alexander Maryanovsky.
5  * All rights reserved.
6  *
7  * The utillib library is free software; you can redistribute
8  * it and/or modify it under the terms of the GNU Lesser General Public License
9  * as published by the Free Software Foundation; either version 2 of the
10  * License, or (at your option) any later version.
11  *
12  * The utillib library is distributed in the hope that it will
13  * be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
15  * General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with utillib library; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  */
21 
22 package free.util;
23 
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 
27 /**
28  * This class is responsible for creating and managing pairs of PipedStreams.
29  * A single instance of this class consists of one PipedInputStream and one
30  * PipedOutputStream connected to each other.
31  */
32 
33 public class PipedStreams{
34 
35 
36   /**
37    * The default buffer size.
38    */
39 
40   private static final int DEFAULT_BUFFER_SIZE = 2048;
41 
42 
43 
44   /**
45    * The PipedInputStream.
46    */
47 
48   private final PipedInputStream in;
49 
50 
51 
52   /**
53    * The PipedOutputStream.
54    */
55 
56   private final PipedOutputStream out;
57 
58 
59 
60   /**
61    * The value of the soTimeout.
62    */
63 
64   private volatile int soTimeout = 0;
65 
66 
67 
68   /**
69    * The buffer.
70    */
71 
72   private byte [] buf;
73 
74 
75 
76   /**
77    * Whether the buffer is allowed to grow.
78    */
79 
80   private final boolean growBuf;
81 
82 
83 
84 
85   /**
86    * The index of the byte that will be read next.
87    */
88 
89   private int readIndex = 0;
90 
91 
92 
93   /**
94    * The index of the byte that that will be written next.
95    */
96 
97   private int writeIndex = 0;
98 
99 
100 
101   /**
102    * Becomes true when the PipedOutputStream gets closed.
103    */
104 
105   private boolean writerClosed = false;
106 
107 
108 
109   /**
110    * Becomes true when the PipedInputStream gets closed.
111    */
112 
113   private boolean readerClosed = false;
114 
115 
116 
117   /**
118    * The lock protecting writing.
119    */
120 
121   private Object writeLock = new String("Write Lock for PipedStreams");
122 
123 
124 
125   /**
126    * The lock protecting reading.
127    */
128 
129   private Object readLock = new String("Read Lock for PipedStream");
130 
131 
132 
133   /**
134    * Creates new <code>PipedStreams</code>.
135    */
136 
PipedStreams()137   public PipedStreams(){
138     this(2048, false);
139   }
140 
141 
142 
143   /**
144    * Creates new <code>PipedStreams</code> with the specified buffer size. Once
145    * the specified amount of bytes have been written into the,
146    * <code>OutputStream</code> attempting to write more data will block until
147    * enough data has been read to allow writing into the buffer again.
148    */
149 
PipedStreams(int bufSize)150   public PipedStreams(int bufSize){
151     this(bufSize, false);
152   }
153 
154 
155 
156 
157   /**
158    * Creates new <code>PipedStreams</code>. If <code>growBuf</code> is
159    * <code>true</code>, the internal buffer will be grown indefinitely when more
160    * space is required for the written data. This means that writing into the
161    * <code>OutputStream</code> will never block. Note that there is currently no
162    * mechanism to cause the internal buffer to shrink.
163    */
164 
PipedStreams(boolean growBuf)165   public PipedStreams(boolean growBuf){
166     this(DEFAULT_BUFFER_SIZE, growBuf);
167   }
168 
169 
170 
171 
172   /**
173    * Creates new <code>PipedStreams</code> with the specified initial buffer
174    * size, potentially allowing the buffer to grow indefinitely.
175    */
176 
PipedStreams(int bufSize, boolean growBuf)177   public PipedStreams(int bufSize, boolean growBuf){
178     if (bufSize <= 0)
179       throw new IllegalArgumentException("The buffer size must be a positive integer");
180 
181     in = new PipedInputStream(this);
182     out = new PipedOutputStream(this);
183 
184     this.growBuf = growBuf;
185     this.buf = new byte[bufSize];
186   }
187 
188 
189 
190 
191   /**
192    * Sets the SO_TIMEOUT for the PipedInputStream. A read operation on the
193    * PipedInputStream will only block for the given amount of milliseconds, after
194    * that, an InterruptedIOException will be thrown, but the streams will remain
195    * valid. A value of 0 implies this option is off (read() can block indefinitely).
196    * NOTE: This method should not be called while a read() operation is in progress
197    * (it will block until the read() is done, which may be a long time, or never).
198    */
199 
setSoTimeout(int timeout)200   public void setSoTimeout(int timeout){
201     synchronized(readLock){ // Don't modify this while a read is in progress.
202       soTimeout = timeout;
203     }
204   }
205 
206 
207 
208 
209   /**
210    * Returns the value of SO_TIMEOUT. A value of 0 implies this option is off
211    * (read() can block indefinitely).
212    */
213 
getSoTimeout()214   public int getSoTimeout(){
215     return soTimeout;
216   }
217 
218 
219 
220 
221 
222   /**
223    * Returns the PipedInputStream.
224    */
225 
getInputStream()226   public PipedInputStream getInputStream(){
227     return in;
228   }
229 
230 
231   /**
232    * Returns the PipedOutputStream.
233    */
234 
getOutputStream()235   public PipedOutputStream getOutputStream(){
236     return out;
237   }
238 
239 
240 
241   /**
242    * Returns the amount of bytes available to be read immediately (without
243    * blocking) by the PipedInputStream.
244    */
245 
available()246   synchronized int available(){
247     if (readerClosed)
248       return 0;
249 
250     return availableImpl();
251   }
252 
253 
254 
255 
256   /**
257    * Returns the amount of bytes available to be read immediately (without
258    * blocking) by the PipedInputStream.
259    */
260 
availableImpl()261   private int availableImpl(){
262     if (writeIndex >= readIndex) // On the same lap.
263       return writeIndex - readIndex;
264     else // On different laps.
265       return writeIndex + buf.length - readIndex;
266   }
267 
268 
269 
270 
271   /**
272    * Returns the amount of bytes that can be written into the buffer without
273    * blocking.
274    */
275 
availableSpace()276   private int availableSpace(){
277     return buf.length - availableImpl() - 1;
278   }
279 
280 
281 
282 
283   /**
284    * Increases the size of the internal buffer by at least the specified amount
285    * of bytes. The caller must take care of proper synchronization.
286    */
287 
growBuf(int minGrowSize)288   private void growBuf(int minGrowSize){
289     int growSize = minGrowSize < buf.length ? buf.length : minGrowSize;
290     byte [] newBuf = new byte[buf.length + growSize];
291     System.arraycopy(buf, 0, newBuf, 0, buf.length);
292     buf = newBuf;
293   }
294 
295 
296 
297 
298   /**
299    * Writes a single byte to the buffer. Note that this method can block if the
300    * buffer is full.
301    */
302 
303   synchronized void write(int b) throws IOException{
304     synchronized(writeLock){
305       if (readerClosed || writerClosed)
306         throw new IOException("Stream closed");
307 
308       while (availableSpace() == 0){
309         if (growBuf)
310           growBuf(1);
311         else try{
312           wait();
313         } catch (InterruptedException e){
314             throw new InterruptedIOException();
315           }
316       }
317 
318       if (readerClosed || writerClosed)
319         throw new IOException("Stream closed");
320 
321       buf[writeIndex++] = (byte)(b&0xff);
322       if (writeIndex == buf.length)
323         writeIndex = 0;
324 
325       notifyAll();
326     }
327   }
328 
329 
330 
331   /**
332    * Writes bytes according to the contract of OutputStream.write(byte [], int, int)
333    * with the only difference that this might block if the buffer is full.
334    */
335 
336   synchronized void write(byte [] arr, int offset, int length) throws IOException{
337     synchronized(writeLock){
338       if (readerClosed||writerClosed)
339         throw new IOException("Stream closed");
340 
341       if (growBuf && (length > availableSpace()))
342         growBuf(length - availableSpace());
343 
344       while(length > 0){
345         while (availableSpace() == 0){
346           try{
347             wait();
348           } catch (InterruptedException e){
349               throw new InterruptedIOException();
350             }
351         }
352 
353         int availableSpace = availableSpace();
354 
355         int amountToWrite = length > availableSpace ? availableSpace : length;
356         int part1Size = buf.length-writeIndex >= amountToWrite ? amountToWrite : buf.length-writeIndex;
357         int part2Size = amountToWrite-part1Size > 0 ? amountToWrite - part1Size : 0;
358 
359         System.arraycopy(arr, offset, buf, writeIndex, part1Size);
360         System.arraycopy(arr, offset + part1Size, buf, 0, part2Size);
361 
362         offset += amountToWrite;
363         length -= amountToWrite;
364 
365         writeIndex = (writeIndex + amountToWrite) % buf.length;
366 
367         notifyAll();
368       }
369     }
370   }
371 
372 
373 
374   /**
375    * Reads a single byte from the buffer according to the contract of
376    * InputStream.read().
377    */
378 
read()379   synchronized int read() throws IOException{
380     synchronized(readLock){
381       if (readerClosed)
382         throw new IOException("Stream closed");
383 
384       final long startedWaitingTS = System.currentTimeMillis();
385       while (available() == 0){
386         if (writerClosed)
387           return -1;
388 
389         long curTime = System.currentTimeMillis();
390         if ((soTimeout != 0) && (curTime - startedWaitingTS >= soTimeout))
391           throw new InterruptedIOException();
392 
393         try{
394           if (soTimeout == 0)
395             wait();
396           else{
397             wait(soTimeout + curTime - startedWaitingTS);
398           }
399         } catch (InterruptedException e){
400             throw new InterruptedIOException();
401           }
402         if (readerClosed)
403           throw new IOException("Stream closed");
404 
405       }
406 
407       int b = buf[readIndex++];
408       if (readIndex == buf.length)
409         readIndex = 0;
410 
411       notifyAll();
412 
413       return b < 0 ? b+256 : b;
414     }
415   }
416 
417 
418 
419 
420   /**
421    * Reads bytes from the buffer into the given array according to the contract
422    * of InputStream.read(byte [], int, int)
423    */
424 
read(byte [] arr, int offset, int length)425   synchronized int read(byte [] arr, int offset, int length) throws IOException{
426     synchronized(readLock){
427       if (readerClosed)
428         throw new IOException("Stream closed");
429 
430       final long startedWaitingTS = System.currentTimeMillis();
431       while (available() == 0){
432         if (writerClosed)
433           return -1;
434 
435         long curTime = System.currentTimeMillis();
436         if ((soTimeout != 0) && (curTime - startedWaitingTS >= soTimeout))
437           throw new InterruptedIOException();
438 
439         try{
440           if (soTimeout == 0)
441             wait();
442           else{
443             wait(soTimeout + curTime - startedWaitingTS);
444           }
445         } catch (InterruptedException e){
446             throw new InterruptedIOException();
447           }
448         if (readerClosed)
449           throw new IOException("Stream closed");
450       }
451 
452       int available = available();
453       int amountToRead = length > available ? available : length;
454       int part1Size = buf.length-readIndex > amountToRead ? amountToRead : buf.length-readIndex;
455       int part2Size = amountToRead-part1Size > 0 ? amountToRead-part1Size : 0;
456 
457       System.arraycopy(buf, readIndex, arr, offset, part1Size);
458       System.arraycopy(buf, 0, arr, offset + part1Size, part2Size);
459 
460       readIndex = (readIndex + amountToRead) % buf.length;
461 
462       notifyAll();
463 
464       return amountToRead;
465     }
466   }
467 
468 
469 
470   /**
471    * Closes down the streams as far as the PipedOutputStream is concerned.
472    */
473 
closeWriter()474   synchronized void closeWriter(){
475     if (writerClosed)
476       throw new IllegalStateException("Already closed");
477     writerClosed = true;
478     notifyAll();
479   }
480 
481 
482 
483   /**
484    * Closes down the streams as far as the PipedInputStream is concerned
485    */
486 
closeReader()487   synchronized void closeReader(){
488     if (readerClosed)
489       throw new IllegalStateException("Already closed");
490 
491     readerClosed = true;
492     notifyAll();
493   }
494 
495 }
496