1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.io.compress; 20 21 import java.io.EOFException; 22 import java.io.IOException; 23 import java.io.InputStream; 24 25 import org.apache.hadoop.classification.InterfaceAudience; 26 import org.apache.hadoop.classification.InterfaceStability; 27 import org.apache.hadoop.io.compress.Decompressor; 28 29 @InterfaceAudience.Public 30 @InterfaceStability.Evolving 31 public class DecompressorStream extends CompressionInputStream { 32 protected Decompressor decompressor = null; 33 protected byte[] buffer; 34 protected boolean eof = false; 35 protected boolean closed = false; 36 private int lastBytesSent = 0; 37 DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize)38 public DecompressorStream(InputStream in, Decompressor decompressor, 39 int bufferSize) 40 throws IOException { 41 super(in); 42 43 if (decompressor == null) { 44 throw new NullPointerException(); 45 } else if (bufferSize <= 0) { 46 throw new IllegalArgumentException("Illegal bufferSize"); 47 } 48 49 this.decompressor = decompressor; 50 buffer = new byte[bufferSize]; 51 } 52 DecompressorStream(InputStream in, Decompressor decompressor)53 public DecompressorStream(InputStream in, Decompressor decompressor) 54 throws IOException { 55 this(in, decompressor, 512); 56 } 57 58 /** 59 * Allow derived classes to directly set the underlying stream. 60 * 61 * @param in Underlying input stream. 62 * @throws IOException 63 */ DecompressorStream(InputStream in)64 protected DecompressorStream(InputStream in) throws IOException { 65 super(in); 66 } 67 68 private byte[] oneByte = new byte[1]; 69 @Override read()70 public int read() throws IOException { 71 checkStream(); 72 return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff); 73 } 74 75 @Override read(byte[] b, int off, int len)76 public int read(byte[] b, int off, int len) throws IOException { 77 checkStream(); 78 79 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 80 throw new IndexOutOfBoundsException(); 81 } else if (len == 0) { 82 return 0; 83 } 84 85 return decompress(b, off, len); 86 } 87 decompress(byte[] b, int off, int len)88 protected int decompress(byte[] b, int off, int len) throws IOException { 89 int n = 0; 90 91 while ((n = decompressor.decompress(b, off, len)) == 0) { 92 if (decompressor.needsDictionary()) { 93 eof = true; 94 return -1; 95 } 96 97 if (decompressor.finished()) { 98 // First see if there was any leftover buffered input from previous 99 // stream; if not, attempt to refill buffer. If refill -> EOF, we're 100 // all done; else reset, fix up input buffer, and get ready for next 101 // concatenated substream/"member". 102 int nRemaining = decompressor.getRemaining(); 103 if (nRemaining == 0) { 104 int m = getCompressedData(); 105 if (m == -1) { 106 // apparently the previous end-of-stream was also end-of-file: 107 // return success, as if we had never called getCompressedData() 108 eof = true; 109 return -1; 110 } 111 decompressor.reset(); 112 decompressor.setInput(buffer, 0, m); 113 lastBytesSent = m; 114 } else { 115 // looks like it's a concatenated stream: reset low-level zlib (or 116 // other engine) and buffers, then "resend" remaining input data 117 decompressor.reset(); 118 int leftoverOffset = lastBytesSent - nRemaining; 119 assert (leftoverOffset >= 0); 120 // this recopies userBuf -> direct buffer if using native libraries: 121 decompressor.setInput(buffer, leftoverOffset, nRemaining); 122 // NOTE: this is the one place we do NOT want to save the number 123 // of bytes sent (nRemaining here) into lastBytesSent: since we 124 // are resending what we've already sent before, offset is nonzero 125 // in general (only way it could be zero is if it already equals 126 // nRemaining), which would then screw up the offset calculation 127 // _next_ time around. IOW, getRemaining() is in terms of the 128 // original, zero-offset bufferload, so lastBytesSent must be as 129 // well. Cheesy ASCII art: 130 // 131 // <------------ m, lastBytesSent -----------> 132 // +===============================================+ 133 // buffer: |1111111111|22222222222222222|333333333333| | 134 // +===============================================+ 135 // #1: <-- off -->|<-------- nRemaining ---------> 136 // #2: <----------- off ----------->|<-- nRem. --> 137 // #3: (final substream: nRemaining == 0; eof = true) 138 // 139 // If lastBytesSent is anything other than m, as shown, then "off" 140 // will be calculated incorrectly. 141 } 142 } else if (decompressor.needsInput()) { 143 int m = getCompressedData(); 144 if (m == -1) { 145 throw new EOFException("Unexpected end of input stream"); 146 } 147 decompressor.setInput(buffer, 0, m); 148 lastBytesSent = m; 149 } 150 } 151 152 return n; 153 } 154 getCompressedData()155 protected int getCompressedData() throws IOException { 156 checkStream(); 157 158 // note that the _caller_ is now required to call setInput() or throw 159 return in.read(buffer, 0, buffer.length); 160 } 161 checkStream()162 protected void checkStream() throws IOException { 163 if (closed) { 164 throw new IOException("Stream closed"); 165 } 166 } 167 168 @Override resetState()169 public void resetState() throws IOException { 170 decompressor.reset(); 171 } 172 173 private byte[] skipBytes = new byte[512]; 174 @Override skip(long n)175 public long skip(long n) throws IOException { 176 // Sanity checks 177 if (n < 0) { 178 throw new IllegalArgumentException("negative skip length"); 179 } 180 checkStream(); 181 182 // Read 'n' bytes 183 int skipped = 0; 184 while (skipped < n) { 185 int len = Math.min(((int)n - skipped), skipBytes.length); 186 len = read(skipBytes, 0, len); 187 if (len == -1) { 188 eof = true; 189 break; 190 } 191 skipped += len; 192 } 193 return skipped; 194 } 195 196 @Override available()197 public int available() throws IOException { 198 checkStream(); 199 return (eof) ? 0 : 1; 200 } 201 202 @Override close()203 public void close() throws IOException { 204 if (!closed) { 205 in.close(); 206 closed = true; 207 } 208 } 209 210 @Override markSupported()211 public boolean markSupported() { 212 return false; 213 } 214 215 @Override mark(int readlimit)216 public synchronized void mark(int readlimit) { 217 } 218 219 @Override reset()220 public synchronized void reset() throws IOException { 221 throw new IOException("mark/reset not supported"); 222 } 223 224 } 225