1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.io.compress;
20 
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.io.InputStream;
24 
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.classification.InterfaceStability;
27 import org.apache.hadoop.io.compress.Decompressor;
28 
29 @InterfaceAudience.Public
30 @InterfaceStability.Evolving
31 public class DecompressorStream extends CompressionInputStream {
32   protected Decompressor decompressor = null;
33   protected byte[] buffer;
34   protected boolean eof = false;
35   protected boolean closed = false;
36   private int lastBytesSent = 0;
37 
DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize)38   public DecompressorStream(InputStream in, Decompressor decompressor,
39                             int bufferSize)
40   throws IOException {
41     super(in);
42 
43     if (decompressor == null) {
44       throw new NullPointerException();
45     } else if (bufferSize <= 0) {
46       throw new IllegalArgumentException("Illegal bufferSize");
47     }
48 
49     this.decompressor = decompressor;
50     buffer = new byte[bufferSize];
51   }
52 
DecompressorStream(InputStream in, Decompressor decompressor)53   public DecompressorStream(InputStream in, Decompressor decompressor)
54   throws IOException {
55     this(in, decompressor, 512);
56   }
57 
58   /**
59    * Allow derived classes to directly set the underlying stream.
60    *
61    * @param in Underlying input stream.
62    * @throws IOException
63    */
DecompressorStream(InputStream in)64   protected DecompressorStream(InputStream in) throws IOException {
65     super(in);
66   }
67 
68   private byte[] oneByte = new byte[1];
69   @Override
read()70   public int read() throws IOException {
71     checkStream();
72     return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff);
73   }
74 
75   @Override
read(byte[] b, int off, int len)76   public int read(byte[] b, int off, int len) throws IOException {
77     checkStream();
78 
79     if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
80       throw new IndexOutOfBoundsException();
81     } else if (len == 0) {
82       return 0;
83     }
84 
85     return decompress(b, off, len);
86   }
87 
decompress(byte[] b, int off, int len)88   protected int decompress(byte[] b, int off, int len) throws IOException {
89     int n = 0;
90 
91     while ((n = decompressor.decompress(b, off, len)) == 0) {
92       if (decompressor.needsDictionary()) {
93         eof = true;
94         return -1;
95       }
96 
97       if (decompressor.finished()) {
98         // First see if there was any leftover buffered input from previous
99         // stream; if not, attempt to refill buffer.  If refill -> EOF, we're
100         // all done; else reset, fix up input buffer, and get ready for next
101         // concatenated substream/"member".
102         int nRemaining = decompressor.getRemaining();
103         if (nRemaining == 0) {
104           int m = getCompressedData();
105           if (m == -1) {
106             // apparently the previous end-of-stream was also end-of-file:
107             // return success, as if we had never called getCompressedData()
108             eof = true;
109             return -1;
110           }
111           decompressor.reset();
112           decompressor.setInput(buffer, 0, m);
113           lastBytesSent = m;
114         } else {
115           // looks like it's a concatenated stream:  reset low-level zlib (or
116           // other engine) and buffers, then "resend" remaining input data
117           decompressor.reset();
118           int leftoverOffset = lastBytesSent - nRemaining;
119           assert (leftoverOffset >= 0);
120           // this recopies userBuf -> direct buffer if using native libraries:
121           decompressor.setInput(buffer, leftoverOffset, nRemaining);
122           // NOTE:  this is the one place we do NOT want to save the number
123           // of bytes sent (nRemaining here) into lastBytesSent:  since we
124           // are resending what we've already sent before, offset is nonzero
125           // in general (only way it could be zero is if it already equals
126           // nRemaining), which would then screw up the offset calculation
127           // _next_ time around.  IOW, getRemaining() is in terms of the
128           // original, zero-offset bufferload, so lastBytesSent must be as
129           // well.  Cheesy ASCII art:
130           //
131           //          <------------ m, lastBytesSent ----------->
132           //          +===============================================+
133           // buffer:  |1111111111|22222222222222222|333333333333|     |
134           //          +===============================================+
135           //     #1:  <-- off -->|<-------- nRemaining --------->
136           //     #2:  <----------- off ----------->|<-- nRem. -->
137           //     #3:  (final substream:  nRemaining == 0; eof = true)
138           //
139           // If lastBytesSent is anything other than m, as shown, then "off"
140           // will be calculated incorrectly.
141         }
142       } else if (decompressor.needsInput()) {
143         int m = getCompressedData();
144         if (m == -1) {
145           throw new EOFException("Unexpected end of input stream");
146         }
147         decompressor.setInput(buffer, 0, m);
148         lastBytesSent = m;
149       }
150     }
151 
152     return n;
153   }
154 
getCompressedData()155   protected int getCompressedData() throws IOException {
156     checkStream();
157 
158     // note that the _caller_ is now required to call setInput() or throw
159     return in.read(buffer, 0, buffer.length);
160   }
161 
checkStream()162   protected void checkStream() throws IOException {
163     if (closed) {
164       throw new IOException("Stream closed");
165     }
166   }
167 
168   @Override
resetState()169   public void resetState() throws IOException {
170     decompressor.reset();
171   }
172 
173   private byte[] skipBytes = new byte[512];
174   @Override
skip(long n)175   public long skip(long n) throws IOException {
176     // Sanity checks
177     if (n < 0) {
178       throw new IllegalArgumentException("negative skip length");
179     }
180     checkStream();
181 
182     // Read 'n' bytes
183     int skipped = 0;
184     while (skipped < n) {
185       int len = Math.min(((int)n - skipped), skipBytes.length);
186       len = read(skipBytes, 0, len);
187       if (len == -1) {
188         eof = true;
189         break;
190       }
191       skipped += len;
192     }
193     return skipped;
194   }
195 
196   @Override
available()197   public int available() throws IOException {
198     checkStream();
199     return (eof) ? 0 : 1;
200   }
201 
202   @Override
close()203   public void close() throws IOException {
204     if (!closed) {
205       in.close();
206       closed = true;
207     }
208   }
209 
210   @Override
markSupported()211   public boolean markSupported() {
212     return false;
213   }
214 
215   @Override
mark(int readlimit)216   public synchronized void mark(int readlimit) {
217   }
218 
219   @Override
reset()220   public synchronized void reset() throws IOException {
221     throw new IOException("mark/reset not supported");
222   }
223 
224 }
225