1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with this
4  * work for additional information regarding copyright ownership. The ASF
5  * licenses this file to you under the Apache License, Version 2.0 (the
6  * "License"); you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations under
15  * the License.
16  */
17 package org.apache.hadoop.io.file.tfile;
18 
19 import java.io.DataInputStream;
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 
25 /**
26  * Several related classes to support chunk-encoded sub-streams on top of a
27  * regular stream.
28  */
29 final class Chunk {
30 
31   /**
32    * Prevent the instantiation of class.
33    */
Chunk()34   private Chunk() {
35     // nothing
36   }
37 
38   /**
39    * Decoding a chain of chunks encoded through ChunkEncoder or
40    * SingleChunkEncoder.
41    */
42   static public class ChunkDecoder extends InputStream {
43     private DataInputStream in = null;
44     private boolean lastChunk;
45     private int remain = 0;
46     private boolean closed;
47 
ChunkDecoder()48     public ChunkDecoder() {
49       lastChunk = true;
50       closed = true;
51     }
52 
reset(DataInputStream downStream)53     public void reset(DataInputStream downStream) {
54       // no need to wind forward the old input.
55       in = downStream;
56       lastChunk = false;
57       remain = 0;
58       closed = false;
59     }
60 
61     /**
62      * Constructor
63      *
64      * @param in
65      *          The source input stream which contains chunk-encoded data
66      *          stream.
67      */
ChunkDecoder(DataInputStream in)68     public ChunkDecoder(DataInputStream in) {
69       this.in = in;
70       lastChunk = false;
71       closed = false;
72     }
73 
74     /**
75      * Have we reached the last chunk.
76      *
77      * @return true if we have reached the last chunk.
78      * @throws java.io.IOException
79      */
isLastChunk()80     public boolean isLastChunk() throws IOException {
81       checkEOF();
82       return lastChunk;
83     }
84 
85     /**
86      * How many bytes remain in the current chunk?
87      *
88      * @return remaining bytes left in the current chunk.
89      * @throws java.io.IOException
90      */
getRemain()91     public int getRemain() throws IOException {
92       checkEOF();
93       return remain;
94     }
95 
96     /**
97      * Reading the length of next chunk.
98      *
99      * @throws java.io.IOException
100      *           when no more data is available.
101      */
readLength()102     private void readLength() throws IOException {
103       remain = Utils.readVInt(in);
104       if (remain >= 0) {
105         lastChunk = true;
106       } else {
107         remain = -remain;
108       }
109     }
110 
111     /**
112      * Check whether we reach the end of the stream.
113      *
114      * @return false if the chunk encoded stream has more data to read (in which
115      *         case available() will be greater than 0); true otherwise.
116      * @throws java.io.IOException
117      *           on I/O errors.
118      */
checkEOF()119     private boolean checkEOF() throws IOException {
120       if (isClosed()) return true;
121       while (true) {
122         if (remain > 0) return false;
123         if (lastChunk) return true;
124         readLength();
125       }
126     }
127 
128     @Override
129     /*
130      * This method never blocks the caller. Returning 0 does not mean we reach
131      * the end of the stream.
132      */
available()133     public int available() {
134       return remain;
135     }
136 
137     @Override
read()138     public int read() throws IOException {
139       if (checkEOF()) return -1;
140       int ret = in.read();
141       if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
142       --remain;
143       return ret;
144     }
145 
146     @Override
read(byte[] b)147     public int read(byte[] b) throws IOException {
148       return read(b, 0, b.length);
149     }
150 
151     @Override
read(byte[] b, int off, int len)152     public int read(byte[] b, int off, int len) throws IOException {
153       if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
154         throw new IndexOutOfBoundsException();
155       }
156 
157       if (!checkEOF()) {
158         int n = Math.min(remain, len);
159         int ret = in.read(b, off, n);
160         if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
161         remain -= ret;
162         return ret;
163       }
164       return -1;
165     }
166 
167     @Override
skip(long n)168     public long skip(long n) throws IOException {
169       if (!checkEOF()) {
170         long ret = in.skip(Math.min(remain, n));
171         remain -= ret;
172         return ret;
173       }
174       return 0;
175     }
176 
177     @Override
markSupported()178     public boolean markSupported() {
179       return false;
180     }
181 
isClosed()182     public boolean isClosed() {
183       return closed;
184     }
185 
186     @Override
close()187     public void close() throws IOException {
188       if (closed == false) {
189         try {
190           while (!checkEOF()) {
191             skip(Integer.MAX_VALUE);
192           }
193         } finally {
194           closed = true;
195         }
196       }
197     }
198   }
199 
200   /**
201    * Chunk Encoder. Encoding the output data into a chain of chunks in the
202    * following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n,
203    * byte[len_n]. Where len1, len2, ..., len_n are the lengths of the data
204    * chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks
205    * cannot have length 0. All lengths are in the range of 0 to
206    * Integer.MAX_VALUE and are encoded in Utils.VInt format.
207    */
208   static public class ChunkEncoder extends OutputStream {
209     /**
210      * The data output stream it connects to.
211      */
212     private DataOutputStream out;
213 
214     /**
215      * The internal buffer that is only used when we do not know the advertised
216      * size.
217      */
218     private byte buf[];
219 
220     /**
221      * The number of valid bytes in the buffer. This value is always in the
222      * range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt>
223      * through <tt>buf[count-1]</tt> contain valid byte data.
224      */
225     private int count;
226 
227     /**
228      * Constructor.
229      *
230      * @param out
231      *          the underlying output stream.
232      * @param buf
233      *          user-supplied buffer. The buffer would be used exclusively by
234      *          the ChunkEncoder during its life cycle.
235      */
ChunkEncoder(DataOutputStream out, byte[] buf)236     public ChunkEncoder(DataOutputStream out, byte[] buf) {
237       this.out = out;
238       this.buf = buf;
239       this.count = 0;
240     }
241 
242     /**
243      * Write out a chunk.
244      *
245      * @param chunk
246      *          The chunk buffer.
247      * @param offset
248      *          Offset to chunk buffer for the beginning of chunk.
249      * @param len
250      * @param last
251      *          Is this the last call to flushBuffer?
252      */
writeChunk(byte[] chunk, int offset, int len, boolean last)253     private void writeChunk(byte[] chunk, int offset, int len, boolean last)
254         throws IOException {
255       if (last) { // always write out the length for the last chunk.
256         Utils.writeVInt(out, len);
257         if (len > 0) {
258           out.write(chunk, offset, len);
259         }
260       } else {
261         if (len > 0) {
262           Utils.writeVInt(out, -len);
263           out.write(chunk, offset, len);
264         }
265       }
266     }
267 
268     /**
269      * Write out a chunk that is a concatenation of the internal buffer plus
270      * user supplied data. This will never be the last block.
271      *
272      * @param data
273      *          User supplied data buffer.
274      * @param offset
275      *          Offset to user data buffer.
276      * @param len
277      *          User data buffer size.
278      */
writeBufData(byte[] data, int offset, int len)279     private void writeBufData(byte[] data, int offset, int len)
280         throws IOException {
281       if (count + len > 0) {
282         Utils.writeVInt(out, -(count + len));
283         out.write(buf, 0, count);
284         count = 0;
285         out.write(data, offset, len);
286       }
287     }
288 
289     /**
290      * Flush the internal buffer.
291      *
292      * Is this the last call to flushBuffer?
293      *
294      * @throws java.io.IOException
295      */
flushBuffer()296     private void flushBuffer() throws IOException {
297       if (count > 0) {
298         writeChunk(buf, 0, count, false);
299         count = 0;
300       }
301     }
302 
303     @Override
write(int b)304     public void write(int b) throws IOException {
305       if (count >= buf.length) {
306         flushBuffer();
307       }
308       buf[count++] = (byte) b;
309     }
310 
311     @Override
write(byte b[])312     public void write(byte b[]) throws IOException {
313       write(b, 0, b.length);
314     }
315 
316     @Override
write(byte b[], int off, int len)317     public void write(byte b[], int off, int len) throws IOException {
318       if ((len + count) >= buf.length) {
319         /*
320          * If the input data do not fit in buffer, flush the output buffer and
321          * then write the data directly. In this way buffered streams will
322          * cascade harmlessly.
323          */
324         writeBufData(b, off, len);
325         return;
326       }
327 
328       System.arraycopy(b, off, buf, count, len);
329       count += len;
330     }
331 
332     @Override
flush()333     public void flush() throws IOException {
334       flushBuffer();
335       out.flush();
336     }
337 
338     @Override
close()339     public void close() throws IOException {
340       if (buf != null) {
341         try {
342           writeChunk(buf, 0, count, true);
343         } finally {
344           buf = null;
345           out = null;
346         }
347       }
348     }
349   }
350 
351   /**
352    * Encode the whole stream as a single chunk. Expecting to know the size of
353    * the chunk up-front.
354    */
355   static public class SingleChunkEncoder extends OutputStream {
356     /**
357      * The data output stream it connects to.
358      */
359     private final DataOutputStream out;
360 
361     /**
362      * The remaining bytes to be written.
363      */
364     private int remain;
365     private boolean closed = false;
366 
367     /**
368      * Constructor.
369      *
370      * @param out
371      *          the underlying output stream.
372      * @param size
373      *          The total # of bytes to be written as a single chunk.
374      * @throws java.io.IOException
375      *           if an I/O error occurs.
376      */
SingleChunkEncoder(DataOutputStream out, int size)377     public SingleChunkEncoder(DataOutputStream out, int size)
378         throws IOException {
379       this.out = out;
380       this.remain = size;
381       Utils.writeVInt(out, size);
382     }
383 
384     @Override
write(int b)385     public void write(int b) throws IOException {
386       if (remain > 0) {
387         out.write(b);
388         --remain;
389       } else {
390         throw new IOException("Writing more bytes than advertised size.");
391       }
392     }
393 
394     @Override
write(byte b[])395     public void write(byte b[]) throws IOException {
396       write(b, 0, b.length);
397     }
398 
399     @Override
write(byte b[], int off, int len)400     public void write(byte b[], int off, int len) throws IOException {
401       if (remain >= len) {
402         out.write(b, off, len);
403         remain -= len;
404       } else {
405         throw new IOException("Writing more bytes than advertised size.");
406       }
407     }
408 
409     @Override
flush()410     public void flush() throws IOException {
411       out.flush();
412     }
413 
414     @Override
close()415     public void close() throws IOException {
416       if (closed == true) {
417         return;
418       }
419 
420       try {
421         if (remain > 0) {
422           throw new IOException("Writing less bytes than advertised size.");
423         }
424       } finally {
425         closed = true;
426       }
427     }
428   }
429 }
430